[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355012#comment-16355012 ] Prasanna Subburaj commented on KAFKA-6490: -- [~ewencp]: Thanks for giving me permissions. I am interested in working on this improvement and yes we need to discuss more on the dead letter queue. After creating the page I will start the discussion thread in the mailing list. > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Assignee: Prasanna Subburaj >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prasanna Subburaj reassigned KAFKA-6490: Assignee: Prasanna Subburaj > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Assignee: Prasanna Subburaj >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6533) Kafka log cleaner stopped due to "cannot allocate memory" error
[ https://issues.apache.org/jira/browse/KAFKA-6533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354994#comment-16354994 ] huxihx commented on KAFKA-6533: --- [~law1] Seems there is no way to only restart the background thread. "Cannot allocate memory" there is insufficient memory for the Java Runtime Environment to continue. > Kafka log cleaner stopped due to "cannot allocate memory" error > --- > > Key: KAFKA-6533 > URL: https://issues.apache.org/jira/browse/KAFKA-6533 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Law >Priority: Major > > Hi, > I am on Kafka 0.10.2.0 and have an issue where the log cleaner is running > okay but suddenly stops because of a "cannot allocate memory" error. > Here is the error from log-cleaner.log file: > [2018-02-04 02:57:41,343] INFO [kafka-log-cleaner-thread-0], > Log cleaner thread 0 cleaned log __consumer_offsets-35 (dirty section > = [31740820448, 31740820448]) > 100.1 MB of log processed in 1.5 seconds (67.5 MB/sec). > Indexed 100.0 MB in 0.8 seconds (131.8 Mb/sec, 51.2% of total time) > Buffer utilization: 0.0% > Cleaned 100.1 MB in 0.7 seconds (138.2 Mb/sec, 48.8% of total time) > Start size: 100.1 MB (771,501 messages) > End size: 0.1 MB (501 messages) > 99.9% size reduction (99.9% fewer messages) > (kafka.log.LogCleaner) > [2018-02-04 02:57:41,348] INFO Cleaner 0: Beginning cleaning of log > __consumer_offsets-15. (kafka.log.LogCleaner) > [2018-02-04 02:57:41,348] INFO Cleaner 0: Building offset map for > __consumer_offsets-15... (kafka.log.LogCleaner) > [2018-02-04 02:57:41,359] INFO Cleaner 0: Building offset map for log > __consumer_offsets-15 for 1 segments in offset range [19492717509, > 19493524087). (kafka.log.LogCleaner) > [2018-02-04 02:57:42,067] INFO Cleaner 0: Offset map for log > __consumer_offsets-15 complete. (kafka.log.LogCleaner) > [2018-02-04 02:57:42,067] INFO Cleaner 0: Cleaning log __consumer_offsets-15 > (cleaning prior to Sun Feb 04 02:57:34 GMT 2018, discarding tombstones prior > to Sat Feb 03 02:53:31 GMT 2018)... (k > [2018-02-04 02:57:42,068] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-15 (largest timestamp Sat Sep 02 15:26:15 GMT 2017) into > 0, discarding deletes. (kafka.log.LogCleaner) > [2018-02-04 02:57:42,078] INFO Cleaner 0: Swapping in cleaned segment 0 for > segment(s) 0 in log __consumer_offsets-15. (kafka.log.LogCleaner) > [2018-02-04 02:57:42,078] INFO Cleaner 0: Cleaning segment 2148231985 in log > __consumer_offsets-15 (largest timestamp Thu Sep 28 15:50:19 GMT 2017) into > 2148231985, discarding deletes. (kafka. > [2018-02-04 02:57:42,080] INFO Cleaner 0: Swapping in cleaned segment > 2148231985 for segment(s) 2148231985 in log __consumer_offsets-15. > (kafka.log.LogCleaner) > [2018-02-04 02:57:42,081] INFO Cleaner 0: Cleaning segment 4296532622 in log > __consumer_offsets-15 (largest timestamp Tue Oct 24 10:33:20 GMT 2017) into > 4296532622, discarding deletes. (kafka. > [2018-02-04 02:57:42,083] INFO Cleaner 0: Swapping in cleaned segment > 4296532622 for segment(s) 4296532622 in log __consumer_offsets-15. > (kafka.log.LogCleaner) > [2018-02-04 02:57:42,083] INFO Cleaner 0: Cleaning segment 6444525822 in log > __consumer_offsets-15 (largest timestamp Mon Nov 20 11:33:30 GMT 2017) into > 6444525822, discarding deletes. (kafka. > [2018-02-04 02:57:42,085] INFO Cleaner 0: Swapping in cleaned segment > 6444525822 for segment(s) 6444525822 in log __consumer_offsets-15. > (kafka.log.LogCleaner) > [2018-02-04 02:57:42,086] INFO Cleaner 0: Cleaning segment 8592045249 in log > __consumer_offsets-15 (largest timestamp Sat Dec 16 06:35:53 GMT 2017) into > 8592045249, discarding deletes. (kafka. > [2018-02-04 02:57:42,088] INFO Cleaner 0: Swapping in cleaned segment > 8592045249 for segment(s) 8592045249 in log __consumer_offsets-15. > (kafka.log.LogCleaner) > [2018-02-04 02:57:42,088] INFO Cleaner 0: Cleaning segment 10739582585 in log > __consumer_offsets-15 (largest timestamp Wed Dec 27 21:15:44 GMT 2017) into > 10739582585, discarding deletes. (kafk > [2018-02-04 02:57:42,091] INFO Cleaner 0: Swapping in cleaned segment > 10739582585 for segment(s) 10739582585 in log __consumer_offsets-15. > (kafka.log.LogCleaner) > [2018-02-04 02:57:42,096] ERROR [kafka-log-cleaner-thread-0], Error due to > (kafka.log.LogCleaner) > java.io.FileNotFoundException: > /kafka/broker1-logs/__consumer_offsets-15/012887210320.log.cleaned > (Cannot allocate memory) > at java.io.RandomAccessFile.open0(Native Method) > at java.io.RandomAccessFile.open(RandomAccessFile.java:316) > at java.io.RandomAccessFile.(RandomAccessFile.java:243) > at >
[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354959#comment-16354959 ] Ewen Cheslack-Postava commented on KAFKA-6490: -- [~prasanna1433] I've given you wiki permissions, you should be able to create a page now. The dead letter queue is something I've specifically heard from a number of users, so it's definitely in demand. The list I gave is based on a ton of real user feedback, so I feel pretty confident that it is both a) covering important use cases and b) comprehensive enough to address the vast majority of use cases. But I'm of course open to discussion of the options. I suspect *more* options would be the result rather than removing some. If you want to take on this improvement, we can of course discuss further in the KIP thread :) With respect to the version, new features should almost universally be worked on in trunk – older release branches are reserved for bug fixes. In this case, since we just cut 1.1 branches, this would be a candidate for 1.2 / 2.0 and can simply be developed against trunk. > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354957#comment-16354957 ] Prasanna Subburaj commented on KAFKA-6490: -- [~ewencp]: Thanks for feedback. What you are mentioning makes sense we should users option to chose from because each use case is different. I feel that Discard and log option can be provided to the user and skeptical about the dead letter queue. Also which version should this bug be worked upon ? Can I get please get access to the confluent ([https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)] as well ? > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354945#comment-16354945 ] Ewen Cheslack-Postava commented on KAFKA-6490: -- A change in behavior like that would definitely require a KIP – existing users would not expect this at all. Connect started with the current behavior because for many users losing data is worse than suffering some downtime. However, it's clear some alternatives are warranted; this question comes up from time to time on mailing lists. Generally there are only a few options that seem to make sense: * Stop processing (current behavior) and log * Log and retry (really only makes sense for unusual edge cases where data got corrupted in flight between Kafka and Connect) * Discard and log (I care about uptime more than a bit of lost data) * Dead letter queue (or some other fallback handler) The retry case is probably the least important here as it will rarely make a difference, so the other 3 are the ones I think we'd want to implement. A KIP for this should be straightforward, though the implementation will require care to make sure we handle all places errors can occur (in the producer/consumer, during deserialization, during transformations, etc). > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value
[ https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354924#comment-16354924 ] Prasanna Subburaj commented on KAFKA-3832: -- [~kkonstantine], [~wicknicks] Please advice on this issue > Kafka Connect's JSON Converter never outputs a null value > - > > Key: KAFKA-3832 > URL: https://issues.apache.org/jira/browse/KAFKA-3832 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Assignee: Prasanna Subburaj >Priority: Major > Labels: newbie > > Kafka Connect's JSON Converter will never output a null value when > {{enableSchemas=true}}. This means that when a connector outputs a > {{SourceRecord}} with a null value, the JSON Converter will always produce a > message value with: > {code:javascript} > { > "schema": null, > "payload": null > } > {code} > And, this means that while Kafka log compaction will always be able to remove > earlier messages with the same key, log compaction will never remove _all_ of > the messages with the same key. > The JSON Connector's {{fromConnectData(...)}} should always return null when > it is supplied a null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value
[ https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Prasanna Subburaj reassigned KAFKA-3832: Assignee: Prasanna Subburaj > Kafka Connect's JSON Converter never outputs a null value > - > > Key: KAFKA-3832 > URL: https://issues.apache.org/jira/browse/KAFKA-3832 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch >Assignee: Prasanna Subburaj >Priority: Major > Labels: newbie > > Kafka Connect's JSON Converter will never output a null value when > {{enableSchemas=true}}. This means that when a connector outputs a > {{SourceRecord}} with a null value, the JSON Converter will always produce a > message value with: > {code:javascript} > { > "schema": null, > "payload": null > } > {code} > And, this means that while Kafka log compaction will always be able to remove > earlier messages with the same key, log compaction will never remove _all_ of > the messages with the same key. > The JSON Connector's {{fromConnectData(...)}} should always return null when > it is supplied a null value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354908#comment-16354908 ] Prasanna Subburaj edited comment on KAFKA-6490 at 2/7/18 3:41 AM: -- Thanks for you response [~mjsax]. I think we need a similar feature in connector because now if we get a malformed JSON message the connector will fail and will not process any additional message that are coming after this one. Can you also please add me to the contributors list ? was (Author: prasanna1433): Thanks for you response [~mjsax]. I think we need a similar feature in connector because now if we get a malformed JSON message the connector will fail and will not process any additional message that are coming after this one. I can work on KIP for solving this issue if the forks you tagged in agree with what I am saying. Can you also please add me to the contributors list ? > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354908#comment-16354908 ] Prasanna Subburaj commented on KAFKA-6490: -- Thanks for you response [~mjsax]. I think we need a similar feature in connector because now if we get a malformed JSON message the connector will fail and will not process any additional message that are coming after this one. I can work on KIP for solving this issue if the forks you tagged in agree with what I am saying. Can you also please add me to the contributors list ? > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6539) KafkaConsumer endlessly try to connect to a dead broker, ignoring brokers alive
Song Younghwan created KAFKA-6539: - Summary: KafkaConsumer endlessly try to connect to a dead broker, ignoring brokers alive Key: KAFKA-6539 URL: https://issues.apache.org/jira/browse/KAFKA-6539 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 1.0.0 Environment: Java 8 Brokers on CentOS 7.4 Consumers on Windows 10 Reporter: Song Younghwan Attachments: consumer.log I consider to use Kafka in my company, so currently doing failover test. Conditions: * org.apache.kafka:kafka-clients:1.0.0 * New consumer using bootstrap.servers, a consumer group and a group coordinator * num. brokers = 3 (id #1, #2, #3) * Topic num. partitions = 3, replication factor = 3 * offsets.topic.replication.factor = 3 Reproduction Step: # Run consumers in the same consumer group, each of them subscribe to a topic # Kill (kill -9) #1, #2 broker simultaneously (only #3 online) # Consumers eventually connect to #3 broker # Start #1, #2 broker again after a while (#1, #2, #3 online) # Kill (kill -9) #2, #3 broker simultaneously (only #1 online) # *{color:#FF}Now consumers endlessly try to connect to #3 broker only{color}* # Start #2 broker again after a while (#1, #2 online) # *{color:#FF}Consumers still blindly try to connect to #3 broker{color}* Expectation: Consumers successfully connect to #1 broker after step 5. Record: I attached a consumer log file with TRACE log level. Related events below: * 12:03:13 kills #1, #2 broker simultaneously * 12:03:42 starts #1, #2 broker again * 12:04:01 kills #2, #3 broker simultaneously * 12:04:42 starts #2 broker again -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354889#comment-16354889 ] Bill Bejeck commented on KAFKA-2967: +1 for RST from me. > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354876#comment-16354876 ] huxihx commented on KAFKA-6524: --- [~omkreddy] Could you set `client.id=__admin_client` in producer.config and retry? > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration
[ https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6534: - Issue Type: Bug (was: Improvement) > Consumer.poll may not trigger rebalance in time when there is a task migration > -- > > Key: KAFKA-6534 > URL: https://issues.apache.org/jira/browse/KAFKA-6534 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > When Streams detect a task migration event in one of its thread, today it > will always let its trigger to call {{consumer.poll}} hoping it could trigger > the rebalance and hence clean up the records buffered from the partitions > that on longer owned. However, because the rebalance is based on heartbeat > responses which has a window of race, the rebalance is not always guaranteed > to be triggered when task migration happens. As a result it could cause the > records buffered in consumer to not be cleaned up and later be processed by > Streams, realizing it no longer belongs to the thread, causing: > {code:java} > java.lang.IllegalStateException: Record's partition does not belong to this > partition-group. > {code} > Note this issue is only relevant when EOS is turned on, and based the default > heartbeat.interval.ms value (3 sec), the race likelihood should not be high. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration
[ https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-6534: Assignee: Guozhang Wang > Consumer.poll may not trigger rebalance in time when there is a task migration > -- > > Key: KAFKA-6534 > URL: https://issues.apache.org/jira/browse/KAFKA-6534 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > When Streams detect a task migration event in one of its thread, today it > will always let its trigger to call {{consumer.poll}} hoping it could trigger > the rebalance and hence clean up the records buffered from the partitions > that on longer owned. However, because the rebalance is based on heartbeat > responses which has a window of race, the rebalance is not always guaranteed > to be triggered when task migration happens. As a result it could cause the > records buffered in consumer to not be cleaned up and later be processed by > Streams, realizing it no longer belongs to the thread, causing: > {code:java} > java.lang.IllegalStateException: Record's partition does not belong to this > partition-group. > {code} > Note this issue is only relevant when EOS is turned on, and based the default > heartbeat.interval.ms value (3 sec), the race likelihood should not be high. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354856#comment-16354856 ] Guozhang Wang commented on KAFKA-2967: -- I'm +1. > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354853#comment-16354853 ] Matthias J. Sax commented on KAFKA-2967: Big +1 for RST from my side. > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354852#comment-16354852 ] Matthias J. Sax commented on KAFKA-6490: I am not too familiar with the details of Connect. However, it sound like this change might require a KIP. There was a similar issue for Streams and we added a config so people can choose to resume of fail for this case: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers] \cc [~wicknicks] [~rhauch] [~kkonstantine] > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric
[ https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354831#comment-16354831 ] Allen Wang commented on KAFKA-6514: --- [~ijuma] Since we don't change the name of the metric, but just adding a tag, it shouldn't break existing monitoring system. So I am wondering why a KIP is still required in this case. For example, I found that when upgrading to Kafka 0.10, "records-consumed-rate" metric has an added tag "topic", which makes sense to me. I would prefer keeping the same metric name since people are already familiar with it and the name makes sense. It would also have minimal impact on the storage required on a typical metric aggregation system. > Add API version as a tag for the RequestsPerSec metric > -- > > Key: KAFKA-6514 > URL: https://issues.apache.org/jira/browse/KAFKA-6514 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.0.0 >Reporter: Allen Wang >Priority: Major > > After we upgrade broker to a new version, one important insight is to see how > many clients have been upgraded so that we can switch the message format when > most of the clients have also been updated to the new version to minimize the > performance penalty. > RequestsPerSec with the version tag will give us that insight. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354817#comment-16354817 ] ASF GitHub Bot commented on KAFKA-6472: --- joel-hamill opened a new pull request #126: KAFKA-6472 - Fix WordCount example code error URL: https://github.com/apache/kafka-site/pull/126 https://issues.apache.org/jira/browse/KAFKA-6472 - related https://github.com/apache/kafka/pull/4538 CC: @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Assignee: Joel Hamill >Priority: Trivial > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354816#comment-16354816 ] ASF GitHub Bot commented on KAFKA-6472: --- joel-hamill opened a new pull request #4538: KAFKA-6472 - Fix WordCount example code error URL: https://github.com/apache/kafka/pull/4538 https://issues.apache.org/jira/browse/KAFKA-6472 CC: @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Assignee: Joel Hamill >Priority: Trivial > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect
[ https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354805#comment-16354805 ] Prasanna Subburaj commented on KAFKA-6490: -- [~mjsax] Can you please help [~wspeirs] with this ticket ? > JSON SerializationException Stops Connect > - > > Key: KAFKA-6490 > URL: https://issues.apache.org/jira/browse/KAFKA-6490 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: William R. Speirs >Priority: Major > Attachments: KAFKA-6490_v1.patch > > > If you configure KafkaConnect to parse JSON messages, and you send it a > non-JSON message, the SerializationException message will bubble up to the > top, and stop KafkaConnect. While I understand sending non-JSON to a JSON > serializer is a bad idea, I think that a single malformed message stopping > all of KafkaConnect is even worse. > The data exception is thrown here: > [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305] > > From the call here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476] > This bubbles all the way up to the top, and KafkaConnect simply stops with > the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:172)}} > Thoughts on adding a {{try/catch}} around the {{for}} loop in > WorkerSinkTask's {{convertMessages}} so messages that don't properly parse > are logged, but simply ignored? This way KafkaConnect can keep working even > when it encounters a message it cannot decode? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance
[ https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354798#comment-16354798 ] ASF GitHub Bot commented on KAFKA-6430: --- ying-zheng opened a new pull request #4537: KAFKA-6430: Add buffer between Java data stream and gzip stream URL: https://github.com/apache/kafka/pull/4537 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve Kafka GZip compression performance > -- > > Key: KAFKA-6430 > URL: https://issues.apache.org/jira/browse/KAFKA-6430 > Project: Kafka > Issue Type: Improvement > Components: clients, compression, core >Reporter: Ying Zheng >Priority: Minor > > To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream: > new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); > To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream: >new DataInputStream(new GZIPInputStream(buffer)); > This is very straight forward, but actually inefficient. For each message, in > addition to the key and value data, Kafka has to write about 30 some metadata > bytes (slightly varies in different Kafka version), including magic byte, > checksum, timestamp, offset, key length, value length etc. For each of these > bytes, java DataOutputStream has to call write(byte) once. Here is the > awkward writeInt() method in DataOutputStream, which writes 4 bytes > separately in big-endian order. > {code} > public final void writeInt(int v) throws IOException { > out.write((v >>> 24) & 0xFF); > out.write((v >>> 16) & 0xFF); > out.write((v >>> 8) & 0xFF); > out.write((v >>> 0) & 0xFF); > incCount(4); > } > {code} > Unfortunately, GZIPOutputStream does not implement the write(byte) method. > Instead, it only provides a write(byte[], offset, len) method, which calls > the corresponding JNI zlib function. The write(byte) calls from > DataOutputStream are translated into write(byte[], offset, len) calls in a > very inefficient way: (Oracle JDK 1.8 code) > {code} > class DeflaterOutputStream { > public void write(int b) throws IOException { > byte[] buf = new byte[1]; > buf[0] = (byte)(b & 0xff); > write(buf, 0, 1); > } > public void write(byte[] b, int off, int len) throws IOException { > if (def.finished()) { > throw new IOException("write beyond end of stream"); > } > if ((off | len | (off + len) | (b.length - (off + len))) < 0) { > throw new IndexOutOfBoundsException(); > } else if (len == 0) { > return; > } > if (!def.finished()) { > def.setInput(b, off, len); > while (!def.needsInput()) { > deflate(); > } > } > } > } > class GZIPOutputStream extends DeflaterOutputStream { > public synchronized void write(byte[] buf, int off, int len) > throws IOException > { > super.write(buf, off, len); > crc.update(buf, off, len); > } > } > class Deflater { > private native int deflateBytes(long addr, byte[] b, int off, int len, int > flush); > } > class CRC32 { > public void update(byte[] b, int off, int len) { > if (b == null) { > throw new NullPointerException(); > } > if (off < 0 || len < 0 || off > b.length - len) { > throw new ArrayIndexOutOfBoundsException(); > } > crc = updateBytes(crc, b, off, len); > } > private native static int updateBytes(int crc, byte[] b, int off, int > len); > } > {code} > For each meta data byte, the code above has to allocate 1 single byte array, > acquire several locks, call two native JNI methods (Deflater.deflateBytes and > CRC32.updateBytes). In each Kafka message, there are about 30 some meta data > bytes. > The call stack of Deflater.deflateBytes(): >
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354762#comment-16354762 ] Ewen Cheslack-Postava commented on KAFKA-2967: -- I'd like to raise this once again and see what people think of the current state. The docs have grown increasingly hacky, especially with the streams docs sort of ending up on their own page, weirdly integrated with the table of contents, javascript hacks to get things rendered properly client-side, and what I consider a mess of SSIs. I'd like to propose a solution that I think addresses most of the concerns from the thread. I had some offline discussion with some other folks and we discussed a few different options, including markdown, which I assumed would be the most popular for people writing docs. We concluded that the support from just simple markdown wasn't going to be good enough and that usually you end up having to adopt some layer on top of it anyway (and have to learn all the details of that tool). Depending on what you choose, the ability to integrate with the rest of the site well has varying levels of difficulty. I'd like to propose an alternative that Gwen mentioned originally – use sphinx/rst. * Theming to both reuse existing styling and layout is trivial. * Its a full docs solution and has all the doodads and plugins we'd need as the docs continue to grow. * Migrate progressively, and only migrate docs for now. We'll just be compiling down to html anyway, so we can move progressively, and can choose which parts of the site we want to live under sphinx. Of course there is one caveat that the initial conversion will probably require a big chunk of work since the docs are currently just one giant page and we'd probably want it broken up from the get-go. But html content could still easily be used and linked to. * Can easily escape hatch back to raw html if we ever need it / want more flexibility in layout / etc. It's a trivial command to do so. * Maybe most importantly, I'm prepared to take on the work of setting up the build & doing initial conversion. The main drawback is that not everyone will be familiar with rst. As I mentioned, this is actually a problem with even markdown as you usually need additional tooling on top of the raw syntax so I don't see it as a big problem. The other annoyance is that while we can include html snippets (e.g. configs and metrics autogenerated docs) directly, but it'll look a lot better if we actually convert the autogeneration to spit out rst instead. We don't have too many of these and it's just another method to generate the output, so probably best to just do that at the same time as initial conversion. I think it is easy to bikeshed on something like this (everyone has their favorite tool), but does anybody have any objections to this proposal? I'd like to be able to move forward with this soon as I know dealing with the current form of docs has been a constant pain - there are fewer Connect docs than there should be (and would be if it wasn't so hard to add them) and Streams is constantly fighting with the html docs. If nobody pipes up in a few days, I'll plan to execute and we can take the rest of the discussion to a PR. > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6538) Enhance ByteStore exceptions with more context information
Matthias J. Sax created KAFKA-6538: -- Summary: Enhance ByteStore exceptions with more context information Key: KAFKA-6538 URL: https://issues.apache.org/jira/browse/KAFKA-6538 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.2.0 Reporter: Matthias J. Sax In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and only have concrete key/value types on outer layers/wrappers of the stores. For this reason, the most inner {{RocksDBStore}} cannot provide useful error messages anymore if a put/get/delete operation fails as it only handles plain bytes. Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with corresponding information for which key/value the operation failed in the wrapping stores (KeyValueStore, WindowedStored, and SessionStore). Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2423) Introduce Scalastyle
[ https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354661#comment-16354661 ] Ray Chiang commented on KAFKA-2423: --- [~granthenke], let me know if I can take this one over. > Introduce Scalastyle > > > Key: KAFKA-2423 > URL: https://issues.apache.org/jira/browse/KAFKA-2423 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Grant Henke >Priority: Major > > This is similar to Checkstyle (which we already use), but for Scala: > http://www.scalastyle.org/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric
[ https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354625#comment-16354625 ] Ismael Juma commented on KAFKA-6514: Thanks for the JIRA [~allenxwang]. Agreed that this would be useful. A couple of comments/questions: # For compatibility reasons, we may consider keeping the metric without a version and add new metrics with the version tag. # We need a simple KIP since this affects metrics, which are public interfaces. # Given that the KIP freeze for 1.1.0 was on 23 January ([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957546),] this would have to target the next release. > Add API version as a tag for the RequestsPerSec metric > -- > > Key: KAFKA-6514 > URL: https://issues.apache.org/jira/browse/KAFKA-6514 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.0.0 >Reporter: Allen Wang >Priority: Major > > After we upgrade broker to a new version, one important insight is to see how > many clients have been upgraded so that we can switch the message format when > most of the clients have also been updated to the new version to minimize the > performance penalty. > RequestsPerSec with the version tag will give us that insight. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354610#comment-16354610 ] Matthias J. Sax commented on KAFKA-6535: I don't think we should do this by default. I am just wondering, if we should allow users to "tell" Kafka Streams to purge data ? > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6536) Streams quickstart pom.xml is missing versions for a bunch of plugins
[ https://issues.apache.org/jira/browse/KAFKA-6536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6536: - Labels: newbie (was: ) > Streams quickstart pom.xml is missing versions for a bunch of plugins > - > > Key: KAFKA-6536 > URL: https://issues.apache.org/jira/browse/KAFKA-6536 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 0.11.0.2, 1.0.1 >Reporter: Ewen Cheslack-Postava >Priority: Major > Labels: newbie > Original Estimate: 1h > Remaining Estimate: 1h > > There are a bunch of plugins being used that maven helpfully warns you about > being unversioned: > {code:java} > > [INFO] Scanning for projects... > > [WARNING] > > [WARNING] Some problems were encountered while building the effective model > > for org.apache.kafka:streams-quickstart-java:maven-archetype:1.0.1 > > [WARNING] 'build.plugins.plugin.version' for > > org.apache.maven.plugins:maven-shade-plugin is missing. @ > > org.apache.kafka:streams-quickstart:1.0.1, > > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, > > line 64, column 21 > > [WARNING] 'build.plugins.plugin.version' for > > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ > > org.apache.kafka:streams-quickstart:1.0.1, > > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, > > line 74, column 21 > > [WARNING] > > [WARNING] Some problems were encountered while building the effective model > > for org.apache.kafka:streams-quickstart:pom:1.0.1 > > [WARNING] 'build.plugins.plugin.version' for > > org.apache.maven.plugins:maven-shade-plugin is missing. @ line 64, column 21 > > [WARNING] 'build.plugins.plugin.version' for > > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ line 74, > > column 21 > > [WARNING] > > [WARNING] It is highly recommended to fix these problems because they > > threaten the stability of your build. > > [WARNING] > > [WARNING] For this reason, future Maven versions might no longer support > > building such malformed projects.{code} > Unversioned dependencies are dangerous as they make the build > non-reproducible. In fact, a released version may become very difficult to > build as the user would have to track down the working versions of the > plugins. This seems particularly bad for the quickstart as it's likely to be > copy/pasted into people's own projects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354548#comment-16354548 ] Guozhang Wang commented on KAFKA-6535: -- I think we cannot safely do that for user topics, since these topics maybe shared by multiple applications, and some of them may not even be written in Streams. Though admittedly in practice such sharing may not be common, but we still cannot do that for sure. > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values
[ https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354546#comment-16354546 ] Guozhang Wang commented on KAFKA-4750: -- As a summary of this resolution: {code} Here is the new rule for handling nulls: * in the interface store, put(key, null) are handled normally and value serde applied to null. * in the inner most store, null bytes after serialization will always be treated as deletes. * in the inner most store, range queries returning iterators should never return null bytes. * in the interface store, if null bytes get returned in get(key), serde will be avoided and null object will be returned. {code} > KeyValueIterator returns null values > > > Key: KAFKA-4750 > URL: https://issues.apache.org/jira/browse/KAFKA-4750 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0 >Reporter: Michal Borowiecki >Assignee: Evgeny Veretennikov >Priority: Major > Labels: newbie > Fix For: 1.1.0 > > Attachments: DeleteTest.java > > > The API for ReadOnlyKeyValueStore.range method promises the returned iterator > will not return null values. However, after upgrading from 0.10.0.0 to > 0.10.1.1 we found null values are returned causing NPEs on our side. > I found this happens after removing entries from the store and I found > resemblance to SAMZA-94 defect. The problem seems to be as it was there, when > deleting entries and having a serializer that does not return null when null > is passed in, the state store doesn't actually delete that key/value pair but > the iterator will return null value for that key. > When I modified our serilizer to return null when null is passed in, the > problem went away. However, I believe this should be fixed in kafka streams, > perhaps with a similar approach as SAMZA-94. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4750) KeyValueIterator returns null values
[ https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-4750: - Fix Version/s: 1.1.0 > KeyValueIterator returns null values > > > Key: KAFKA-4750 > URL: https://issues.apache.org/jira/browse/KAFKA-4750 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0 >Reporter: Michal Borowiecki >Assignee: Evgeny Veretennikov >Priority: Major > Labels: newbie > Fix For: 1.1.0 > > Attachments: DeleteTest.java > > > The API for ReadOnlyKeyValueStore.range method promises the returned iterator > will not return null values. However, after upgrading from 0.10.0.0 to > 0.10.1.1 we found null values are returned causing NPEs on our side. > I found this happens after removing entries from the store and I found > resemblance to SAMZA-94 defect. The problem seems to be as it was there, when > deleting entries and having a serializer that does not return null when null > is passed in, the state store doesn't actually delete that key/value pair but > the iterator will return null value for that key. > When I modified our serilizer to return null when null is passed in, the > problem went away. However, I believe this should be fixed in kafka streams, > perhaps with a similar approach as SAMZA-94. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354332#comment-16354332 ] Matthias J. Sax commented on KAFKA-5253: I doubled checked {{TopologyTestDriver}} and it is not able to handle Pattern subscription. The fix should be fairly easy, however, required a public API change and thus a KIP. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5253: --- Affects Version/s: (was: 0.10.2.1) 1.1.0 > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5253: --- Description: *Context* -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to unit test topologies while developing KStreams apps. One such topology uses a Pattern to consume from multiple topics at once. *Problem* The unit test of the topology fails because -KStreamTestDriver- TopologyTestDriver fails to deal with Patterns properly. *Example* Underneath is a unit test explaining what I understand should happen, but is failing. Explicitly adding a source topic matching the topic pattern, generates an exception as the topology builder explicitly checks overlapping topic names and patterns, in any order of adding pattern and topic. So, it is intended behaviour. {code:java} @Test public void shouldProcessFromSourcesThatDoMatchThePattern() { // -- setup stream pattern final KStreamsource = builder.stream(Pattern.compile("topic-source-\\d")); source.to("topic-sink"); // -- setup processor to capture results final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); source.process(processorSupplier); // -- add source to stream data from //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), "topic-source-3"); // -- build test driver driver = new KStreamTestDriver(builder); driver.setTime(0L); // -- test driver.process("topic-source-3", "A", "aa"); // -- validate // no exception was thrown assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); } {code} *Solution* If anybody can help in defining the solution, I can create a pull request for this change. was: *Context* KStreamTestDriver is being used to unit test topologies while developing KStreams apps. One such topology uses a Pattern to consume from multiple topics at once. *Problem* The unit test of the topology fails because KStreamTestDriver fails to deal with Patterns properly. *Example* Underneath is a unit test explaining what I understand should happen, but is failing. Explicitly adding a source topic matching the topic pattern, generates an exception as the topology builder explicitly checks overlapping topic names and patterns, in any order of adding pattern and topic. So, it is intended behaviour. {code:java} @Test public void shouldProcessFromSourcesThatDoMatchThePattern() { // -- setup stream pattern final KStream source = builder.stream(Pattern.compile("topic-source-\\d")); source.to("topic-sink"); // -- setup processor to capture results final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); source.process(processorSupplier); // -- add source to stream data from //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), "topic-source-3"); // -- build test driver driver = new KStreamTestDriver(builder); driver.setTime(0L); // -- test driver.process("topic-source-3", "A", "aa"); // -- validate // no exception was thrown assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); } {code} *Solution* If anybody can help in defining the solution, I can create a pull request for this change. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStream source = >
[jira] [Updated] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5253: --- Summary: TopologyTestDriver must handle streams created with patterns (was: KStreamTestDriver must handle streams created with patterns) > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.10.2.1 >Reporter: Wim Van Leuven >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > KStreamTestDriver is being used to unit test topologies while developing > KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because KStreamTestDriver fails to deal > with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request for > this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5253) KStreamTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5253: --- Labels: beginner needs-kip newbie (was: ) > KStreamTestDriver must handle streams created with patterns > --- > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 0.10.2.1 >Reporter: Wim Van Leuven >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > KStreamTestDriver is being used to unit test topologies while developing > KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because KStreamTestDriver fails to deal > with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request for > this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6474: --- Labels: beginner newbie (was: ) > Rewrite test to use new public TopologyTestDriver > - > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-5889) MetricsTest is flaky
[ https://issues.apache.org/jira/browse/KAFKA-5889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu reopened KAFKA-5889: --- As of 332e698ac9c74ce29317021b03a54512c92ac8b3 , I got: {code} kafka.metrics.MetricsTest > testMetricsLeak FAILED java.lang.AssertionError: expected:<1421> but was:<1424> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at kafka.metrics.MetricsTest$$anonfun$testMetricsLeak$1.apply$mcVI$sp(MetricsTest.scala:68) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at kafka.metrics.MetricsTest.testMetricsLeak(MetricsTest.scala:66) {code} > MetricsTest is flaky > > > Key: KAFKA-5889 > URL: https://issues.apache.org/jira/browse/KAFKA-5889 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Major > > The following appeared in several recent builds (e.g. > https://builds.apache.org/job/kafka-trunk-jdk7/2758) : > {code} > kafka.metrics.MetricsTest > testMetricsLeak FAILED > java.lang.AssertionError: expected:<1216> but was:<1219> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > kafka.metrics.MetricsTest$$anonfun$testMetricsLeak$1.apply$mcVI$sp(MetricsTest.scala:68) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at kafka.metrics.MetricsTest.testMetricsLeak(MetricsTest.scala:66) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6184) report a metric of the lag between the consumer offset and the start offset of the log
[ https://issues.apache.org/jira/browse/KAFKA-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354323#comment-16354323 ] ASF GitHub Bot commented on KAFKA-6184: --- junrao closed pull request #4191: KAFKA-6184: report a metric of the lag between the consumer offset ... URL: https://github.com/apache/kafka/pull/4191 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 5dc0b26f8fb..55b7bbb9a8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.BufferSupplier; @@ -578,6 +579,11 @@ private long endTimestamp() { if (partitionLag != null) this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag); +Long lead = subscriptions.partitionLead(partitionRecords.partition); +if (lead != null) { + this.sensors.recordPartitionLead(partitionRecords.partition, lead); +} + return partRecords; } else { // these records aren't next in line based on the last consumed position, ignore them @@ -860,6 +866,11 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) { subscriptions.updateHighWatermark(tp, partition.highWatermark); } +if (partition.logStartOffset >= 0) { +log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset); +subscriptions.updateLogStartOffset(tp, partition.logStartOffset); +} + if (partition.lastStableOffset >= 0) { log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); @@ -934,7 +945,7 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) { @Override public void onAssignment(Set assignment) { -sensors.updatePartitionLagSensors(assignment); +sensors.updatePartitionLagAndLeadSensors(assignment); } public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) { @@ -1250,6 +1261,7 @@ protected void increment(int bytes, int records) { private final Sensor recordsFetched; private final Sensor fetchLatency; private final Sensor recordsFetchLag; +private final Sensor recordsFetchLead; private Set assignedPartitions; @@ -1276,6 +1288,9 @@ private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegis this.recordsFetchLag = metrics.sensor("records-lag"); this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max()); + +this.recordsFetchLead = metrics.sensor("records-lead"); + this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin), new Min()); } private void recordTopicFetchMetrics(String topic, int bytes, int records) { @@ -1311,16 +1326,37 @@ private void recordTopicFetchMetrics(String topic, int bytes, int records) { recordsFetched.record(records); } -private void updatePartitionLagSensors(Set assignedPartitions) { +private void updatePartitionLagAndLeadSensors(Set assignedPartitions) { if (this.assignedPartitions != null) { for (TopicPartition tp : this.assignedPartitions) { -if (!assignedPartitions.contains(tp)) +if (!assignedPartitions.contains(tp)) { metrics.removeSensor(partitionLagMetricName(tp)); +metrics.removeSensor(partitionLeadMetricName(tp)); +} } } this.assignedPartitions = assignedPartitions; } +private void recordPartitionLead(TopicPartition tp, long lead) { +
[jira] [Resolved] (KAFKA-6184) report a metric of the lag between the consumer offset and the start offset of the log
[ https://issues.apache.org/jira/browse/KAFKA-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-6184. Resolution: Fixed Fix Version/s: 1.2.0 The PR is merged to trunk. > report a metric of the lag between the consumer offset and the start offset > of the log > -- > > Key: KAFKA-6184 > URL: https://issues.apache.org/jira/browse/KAFKA-6184 > Project: Kafka > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: Jun Rao >Assignee: huxihx >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > Currently, the consumer reports a metric of the lag between the high > watermark of a log and the consumer offset. It will be useful to report a > similar lag metric between the consumer offset and the start offset of the > log. If this latter lag gets close to 0, it's an indication that the consumer > may lose data soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable
[ https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6526: -- Fix Version/s: 1.2.0 Since it is too late to merge this for 1.1, moving this out to the next release. > Update controller to handle changes to unclean.leader.election.enable > - > > Key: KAFKA-6526 > URL: https://issues.apache.org/jira/browse/KAFKA-6526 > Project: Kafka > Issue Type: Improvement >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > At the moment, updates to default unclean.leader.election.enable uses the > same code path as updates to topic overrides. This requires controller change > for the new value to take effect. It will be good if we can update the > controller to handle the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable
[ https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6526: -- Issue Type: Improvement (was: Sub-task) Parent: (was: KAFKA-6240) > Update controller to handle changes to unclean.leader.election.enable > - > > Key: KAFKA-6526 > URL: https://issues.apache.org/jira/browse/KAFKA-6526 > Project: Kafka > Issue Type: Improvement >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > > At the moment, updates to default unclean.leader.election.enable uses the > same code path as updates to topic overrides. This requires controller change > for the new value to take effect. It will be good if we can update the > controller to handle the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5971) Broker keeps running even though not registered in ZK
[ https://issues.apache.org/jira/browse/KAFKA-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354265#comment-16354265 ] Andrej Urvantsev commented on KAFKA-5971: - I think that we faced the same problem. Running Kafka 1.0.0 {noformat} [2018-02-05 19:43:23,460] INFO Unable to read additional data from server sessionid 0x101aa15a51e000a, likely server has closed socket, closing socket co nnection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,561] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2018-02-05 19:43:23,584] INFO Opening socket connection to server 172.30.35.44/172.30.35.44:2181. Will not attempt to authenticate using SASL (unknown e rror) (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,673] INFO Socket connection established to 172.30.35.44/172.30.35.44:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,700] WARN Unable to reconnect to ZooKeeper service, session 0x101aa15a51e000a has expired (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,700] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2018-02-05 19:43:23,700] INFO Unable to reconnect to ZooKeeper service, session 0x101aa15a51e000a has expired, closing socket connection (org.apache.zoo keeper.ClientCnxn) [2018-02-05 19:43:23,700] INFO Initiating client connection, connectString=172.30.34.209:2181,172.30.36.237:2181,172.30.35.44:2181 sessionTimeout=6000 wa tcher=org.I0Itec.zkclient.ZkClient@6572421 (org.apache.zookeeper.ZooKeeper) [2018-02-05 19:43:23,703] INFO Opening socket connection to server 172.30.34.209/172.30.34.209:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,704] INFO Socket connection established to 172.30.34.209/172.30.34.209:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,704] INFO EventThread shut down for session: 0x101aa15a51e000a (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,705] INFO Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and a ttempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:23,805] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2018-02-05 19:43:23,805] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2018-02-05 19:43:24,523] INFO Opening socket connection to server 172.30.36.237/172.30.36.237:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:24,523] INFO Socket connection established to 172.30.36.237/172.30.36.237:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:24,524] INFO Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and a ttempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:24,886] INFO Socket connection established to 172.30.35.44/172.30.35.44:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:25,460] INFO Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:27,327] INFO Opening socket connection to server 172.30.34.209/172.30.34.209:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:27,328] INFO Socket connection established to 172.30.34.209/172.30.34.209:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:27,332] INFO Session establishment complete on server 172.30.34.209/172.30.34.209:2181, sessionid = 0x101b18ebf3b0001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2018-02-05 19:43:27,332] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2018-02-05 19:43:27,334] INFO re-registering broker info in ZK for broker 3523 (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-02-05 19:43:27,334] INFO Creating /brokers/ids/3523 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-02-05 19:43:27,340] INFO Result of znode creation is: NODEEXISTS (kafka.utils.ZKCheckedEphemeral) [2018-02-05 19:43:27,341] ERROR Error handling event ZkEvent[New session event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@493b2c1d] (org.I0Itec.zkclient.ZkEventThread) java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/3523. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. at
[jira] [Updated] (KAFKA-3665) Default ssl.endpoint.identification.algorithm should be https
[ https://issues.apache.org/jira/browse/KAFKA-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-3665: -- Fix Version/s: 2.0.0 > Default ssl.endpoint.identification.algorithm should be https > - > > Key: KAFKA-3665 > URL: https://issues.apache.org/jira/browse/KAFKA-3665 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.9.0.1, 0.10.0.0 >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > The default `ssl.endpoint.identification.algorithm` is `null` which is not a > secure default (man in the middle attacks are possible). > We should probably use `https` instead. A more conservative alternative would > be to update the documentation instead of changing the default. > A paper on the topic (thanks to Ryan Pridgeon for the reference): > http://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-3665) Default ssl.endpoint.identification.algorithm should be https
[ https://issues.apache.org/jira/browse/KAFKA-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram reassigned KAFKA-3665: - Assignee: Rajini Sivaram > Default ssl.endpoint.identification.algorithm should be https > - > > Key: KAFKA-3665 > URL: https://issues.apache.org/jira/browse/KAFKA-3665 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.9.0.1, 0.10.0.0 >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > > The default `ssl.endpoint.identification.algorithm` is `null` which is not a > secure default (man in the middle attacks are possible). > We should probably use `https` instead. A more conservative alternative would > be to update the documentation instead of changing the default. > A paper on the topic (thanks to Ryan Pridgeon for the reference): > http://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354259#comment-16354259 ] Nick Travers commented on KAFKA-4669: - Chiming in again to note that we're still running into this issue intermittently. The failure mode is the same, with a BufferUnderflowException and stack trace similar to what I posted above. For some additional context, when this occurs it ultimately leads to a JVM that cannot exit as it is waiting on a latch that will never be closed. Here's the hung thread {code:java} "async-message-sender-0" #120 daemon prio=5 os_prio=0 tid=0x7f30b4003000 nid=0x195a1 waiting on condition [0x7f3105ce1000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@9/Native Method) - parking to wait for <0x0007852b1b68> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@9/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@9/AbstractQueuedSynchronizer.java:871) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1024) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1331) at java.util.concurrent.CountDownLatch.await(java.base@9/CountDownLatch.java:232) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462) at com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214) - locked <0x000728c71998> (a com.squareup.kafka.ng.producer.KafkaProducer) at com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585) at com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@9/ThreadPoolExecutor.java:1167) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@9/ThreadPoolExecutor.java:641) at java.lang.Thread.run(java.base@9/Thread.java:844) {code} [Here is the latch|https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java#L34] that is still open in the ProduceRequestResult. I assume that the network thread is responsible for closing that, but if that thread crashes for whatever reason, it never gets a chance to callCountDownLatch#countDown. Arguably, we should probably be using a combination of daemon threads, and the timed version of Future#get, but it _feels_ like something that could be fixed in the producer client, even if it's just for the sake of ensuring that failed ProduceRequestResults can be GC'd eventually, which can't happen if another thread is hung waiting on the latch. cc: [~rsivaram] [~hachikuji] > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at
[jira] [Commented] (KAFKA-4641) Improve test coverage of StreamsThread
[ https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354236#comment-16354236 ] ASF GitHub Bot commented on KAFKA-4641: --- guozhangwang opened a new pull request #4531: KAFKA-4641: Add more unit test for stream thread URL: https://github.com/apache/kafka/pull/4531 Before the patch, jacoco coverage test: Element | Missed Instructions | Cov. | Missed Branches | Cov. | Missed | Cxty | Missed | Lines | Missed | Methods | Missed | Classes -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- Total | 3,386 of 22,177 | 84% | 336 of 1,639 | 79% | 350 | 1,589 | 526 | 4,451 | 103 | 768 | 1 | 102 StreamThread | | 77% | | 76% | 27 | 102 | 48 | 299 | 1 | 31 | 0 | 1 After the patch: ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve test coverage of StreamsThread > -- > > Key: KAFKA-4641 > URL: https://issues.apache.org/jira/browse/KAFKA-4641 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Damian Guy >Priority: Minor > Labels: newbie > > Some methods in {{StreamThread}} have little or no coverage. > In particular: > {{maybeUpdateStandbyTasks}} has little to no coverage > Committing of StandbyTasks in {{commitAll}} > {{maybePunctuate}} > {{commitOne}} - no tests for exceptions > {{unAssignChangeLogPartitions} - no tests for exceptions > {{addStreamsTask}} - no tests for exceptions > {{runLoop}} > Please see coverage report attached to parent -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6504) Connect: Some per-task-metrics not working
[ https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354172#comment-16354172 ] ASF GitHub Bot commented on KAFKA-6504: --- hachikuji closed pull request #4514: KAFKA-6504: Fix creation of a sensor to be specific to a metric group so it is not shared URL: https://github.com/apache/kafka/pull/4514 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 69614948147..4ee8ad6a808 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -677,34 +677,34 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) { // prevent collisions by removing any previously created metrics in this group. metricGroup.close(); -sinkRecordRead = metricGroup.metrics().sensor("sink-record-read"); +sinkRecordRead = metricGroup.sensor("sink-record-read"); sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate()); sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new Total()); -sinkRecordSend = metricGroup.metrics().sensor("sink-record-send"); +sinkRecordSend = metricGroup.sensor("sink-record-send"); sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new Rate()); sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new Total()); -sinkRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count"); +sinkRecordActiveCount = metricGroup.sensor("sink-record-active-count"); sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount), new Value()); sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountMax), new Max()); sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountAvg), new Avg()); -partitionCount = metricGroup.metrics().sensor("partition-count"); +partitionCount = metricGroup.sensor("partition-count"); partitionCount.add(metricGroup.metricName(registry.sinkRecordPartitionCount), new Value()); -offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number"); +offsetSeqNum = metricGroup.sensor("offset-seq-number"); offsetSeqNum.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), new Value()); -offsetCompletion = metricGroup.metrics().sensor("offset-commit-completion"); +offsetCompletion = metricGroup.sensor("offset-commit-completion"); offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), new Rate()); offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new Total()); -offsetCompletionSkip = metricGroup.metrics().sensor("offset-commit-completion-skip"); +offsetCompletionSkip = metricGroup.sensor("offset-commit-completion-skip"); offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), new Rate()); offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new Total()); -putBatchTime = metricGroup.metrics().sensor("put-batch-time"); +putBatchTime = metricGroup.sensor("put-batch-time"); putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), new Max()); putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), new Avg()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index a172cdb45f0..473e2359578 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -509,7 +509,7 @@ public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max()); pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg()); -
[jira] [Created] (KAFKA-6537) Duplicate consumers after consumer group rebalance
Michael Golovanov created KAFKA-6537: Summary: Duplicate consumers after consumer group rebalance Key: KAFKA-6537 URL: https://issues.apache.org/jira/browse/KAFKA-6537 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.1.1 Reporter: Michael Golovanov *_Deployment description_* Kafka brokers have been deployed on ten (10) nodes. Zookeeper cluster have seven (7) nodes. Nodes of Kafka brokers shared with Zookeeper nodes on bare metal hosts. Broker/Zookeeper hosts OS is Redhat 7 and JVM version is Java 8. Host names are grid48, grid237 и grid251. We have one topic with six (6) patitions. Kafka consumers deployed on three (3) hosts. Each host have two (2) consumers. All consumers belong to single group. *_Error description_* After start all consumers Apache Kafka partitions of topic was balanced evenly. grid237 owns partitions 0,1 (0 - consumer thread-0, 1, consumer thread-1) grid251 owns partitions 2,3 (2 - consumer thread-0, consumer thread-1) grid48 owns partitions 4,5 (4- consumer thread-0, 5, consumer thread-1) After some period of time we see haotic revokes and assigns partitions between brokers and then all partitions in log assigned to one consumer on one node grid48 But really all partitions read not only by thread-1 consumer, but thread-0 on grid48. And all messages from topic partitions was duplicate. Consumer thread-0 try to commit message offset and get commit error, thread-1 consumer successfully commit offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
[ https://issues.apache.org/jira/browse/KAFKA-6528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6528. --- Resolution: Fixed Fix Version/s: 1.1.0 > Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize > -- > > Key: KAFKA-6528 > URL: https://issues.apache.org/jira/browse/KAFKA-6528 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > {code:java} > java.lang.AssertionError: expected:<108> but was:<123> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > kafka.server.DynamicBrokerReconfigurationTest.stopAndVerifyProduceConsume(DynamicBrokerReconfigurationTest.scala:755) > at > kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:443) > at > kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:451){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
[ https://issues.apache.org/jira/browse/KAFKA-6528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353653#comment-16353653 ] ASF GitHub Bot commented on KAFKA-6528: --- rajinisivaram closed pull request #4526: KAFKA-6528: Fix transient test failure in testThreadPoolResize URL: https://github.com/apache/kafka/pull/4526 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 6d88d8de8b2..312123c3ab6 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -70,9 +70,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri def resizeThreadPool(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = { fetcherThreadMap.foreach { case (id, thread) => -val removedPartitions = thread.partitionStates.partitionStates.asScala.map { case state => - state.topicPartition -> new BrokerAndInitialOffset(thread.sourceBroker, state.value.fetchOffset) -}.toMap +val removedPartitions = thread.partitionsAndOffsets removeFetcherForPartitions(removedPartitions.keySet) if (id.fetcherId >= newSize) thread.shutdown() diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 925c33095a2..39a70321a6e 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -312,6 +312,12 @@ abstract class AbstractFetcherThread(name: String, finally partitionMapLock.unlock() } + private[server] def partitionsAndOffsets: Map[TopicPartition, BrokerAndInitialOffset] = inLock(partitionMapLock) { +partitionStates.partitionStates.asScala.map { case state => + state.topicPartition -> new BrokerAndInitialOffset(sourceBroker, state.value.fetchOffset) +}.toMap + } + } object AbstractFetcherThread { diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index b7f0ae863a2..cb2ac5244a0 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -69,6 +69,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private val servers = new ArrayBuffer[KafkaServer] private val numServers = 3 + private val numPartitions = 10 private val producers = new ArrayBuffer[KafkaProducer[String, String]] private val consumers = new ArrayBuffer[KafkaConsumer[String, String]] private val adminClients = new ArrayBuffer[AdminClient]() @@ -122,7 +123,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions, replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs) -TestUtils.createTopic(zkClient, topic, numPartitions = 10, replicationFactor = numServers, servers) +TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers) createAdminClient(SecurityProtocol.SSL, SecureInternal) TestMetricsReporter.testReporters.clear() @@ -203,7 +204,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @Test def testKeyStoreAlter(): Unit = { val topic2 = "testtopic2" -TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers) +TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers) // Start a producer and consumer that work with the current truststore. // This should continue working while changes are made @@ -241,7 +242,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet verifyProduceConsume(producer, consumer, 10, topic2) // Verify that all messages sent with retries=0 while keystores were being altered were consumed -stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false) +stopAndVerifyProduceConsume(producerThread, consumerThread) } @Test @@ -282,7 +283,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet