[jira] [Created] (KAFKA-8349) Add Windows batch files corresponding to kafka-delete-records.sh and kafka-log-dirs.sh
Kengo Seki created KAFKA-8349: - Summary: Add Windows batch files corresponding to kafka-delete-records.sh and kafka-log-dirs.sh Key: KAFKA-8349 URL: https://issues.apache.org/jira/browse/KAFKA-8349 Project: Kafka Issue Type: Improvement Components: tools Reporter: Kengo Seki Assignee: Kengo Seki Some shell scripts don't have corresponding batch files in bin\windows. For improving Windows platform support, I'd like to add the following batch files: - bin\windows\kafka-delete-records.bat - bin\windows\kafka-log-dirs.bat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8349) Add Windows batch files corresponding to kafka-delete-records.sh and kafka-log-dirs.sh
[ https://issues.apache.org/jira/browse/KAFKA-8349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837011#comment-16837011 ] ASF GitHub Bot commented on KAFKA-8349: --- sekikn commented on pull request #6709: KAFKA-8349. Add Windows batch files corresponding to kafka-delete-records.sh and kafka-log-dirs.sh URL: https://github.com/apache/kafka/pull/6709 Some shell scripts don't have corresponding batch files in bin\windows. For improving Windows platform support, This PR adds the following batch files: - bin\windows\kafka-delete-records.bat - bin\windows\kafka-log-dirs.bat I confirmed that they worked on Windows 10 Pro as follows: ``` PS C:\kafka-2.2.0> .\bin\windows\kafka-log-dirs.bat --bootstrap-server localhost:9092 --describe (snip) Querying brokers for log directories information Received log directory information from brokers 0 {"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"C:\\tmp\\kafka-logs","error":null,"partitions":[{"partition":"sandbox-0","size":213,"offsetLag":0,"isFuture":false}]}]}]} ``` ``` PS C:\kafka-2.2.0> .\bin\windows\kafka-delete-records.bat --bootstrap-server localhost:9092 --offset-json-file offset-json-file.txt (snip) Executing records delete operation Records delete operation completed: partition: sandbox-0low_watermark: 2 ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Windows batch files corresponding to kafka-delete-records.sh and > kafka-log-dirs.sh > -- > > Key: KAFKA-8349 > URL: https://issues.apache.org/jira/browse/KAFKA-8349 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Minor > > Some shell scripts don't have corresponding batch files in bin\windows. > For improving Windows platform support, I'd like to add the following batch > files: > - bin\windows\kafka-delete-records.bat > - bin\windows\kafka-log-dirs.bat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8350) Splitting batches should consider topic-level message size
huxihx created KAFKA-8350: - Summary: Splitting batches should consider topic-level message size Key: KAFKA-8350 URL: https://issues.apache.org/jira/browse/KAFKA-8350 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 2.3.0 Reporter: huxihx Currently, producers do the batch splitting based on the batch size. However, the split will never succeed when batch size is greatly larger than the topic-level max message size. For instance, if the batch size is set to 8MB but we maintain the default value for broker-side `message.max.bytes` (112, about1MB), producer will endlessly try to split a large batch but never succeeded, as shown below: {code:java} [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce response in correlation id 61 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce response in correlation id 62 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce response in correlation id 63 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce response in correlation id 64 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617){code} A better solution is to have producer do splitting based on the minimum of these two configs. However, it is tricky for the client to get the topic-level or broker-level config values. Seems there could be three ways to do this: # When broker throws `RecordTooLargeException`, do not swallow its real message since it contains the max message size already. If the message is not swallowed, the client easily gets it from the response. # Add code to issue `DescribeConfigsRequest` to retrieve the value. # If splitting failed, lower down the batch size gradually until the split is successful. For example, {code:java} // In RecordAccumulator.java private int steps = 1; .. public int splitAndReenqueue(ProducerBatch bigBatch) { .. Deque dq = bigBatch.split(this.batchSize / steps); if (dq.size() == 1) // split failed steps++; .. }{code} Do all of these make sense? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8350) Splitting batches should consider topic-level message size
[ https://issues.apache.org/jira/browse/KAFKA-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx updated KAFKA-8350: -- Description: Currently, producers do the batch splitting based on the batch size. However, the split will never succeed when batch size is greatly larger than the topic-level max message size. For instance, if the batch size is set to 8MB but we maintain the default value for broker-side `message.max.bytes` (112, about1MB), producer will endlessly try to split a large batch but never succeeded, as shown below: {code:java} [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce response in correlation id 61 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce response in correlation id 62 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce response in correlation id 63 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce response in correlation id 64 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617){code} A better solution is to have producer do splitting based on the minimum of these two configs. However, it is tricky for the client to get the topic-level or broker-level config values. Seems there could be three ways to do this: # When broker throws `RecordTooLargeException`, do not swallow its real message since it contains the max message size already. If the message is not swallowed, the client easily gets it from the response. # Add code to issue `DescribeConfigsRequest` to retrieve the value. # If splitting failed, decreases the batch size gradually until the split is successful. For example, {code:java} // In RecordAccumulator.java private int steps = 1; .. public int splitAndReenqueue(ProducerBatch bigBatch) { .. Deque dq = bigBatch.split(this.batchSize / steps); if (dq.size() == 1) // split failed steps++; .. }{code} Do all of these make sense? was: Currently, producers do the batch splitting based on the batch size. However, the split will never succeed when batch size is greatly larger than the topic-level max message size. For instance, if the batch size is set to 8MB but we maintain the default value for broker-side `message.max.bytes` (112, about1MB), producer will endlessly try to split a large batch but never succeeded, as shown below: {code:java} [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce response in correlation id 61 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce response in correlation id 62 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce response in correlation id 63 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce response in correlation id 64 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617){code} A better solution is to have producer do splitting based on the minimum of these two configs. However, it is tricky for the client to get the topic-level or broker-level config values. Seems there could be three ways to do this: # When broker throws `RecordTooLargeException`, do not swallow its real message since it contains the max message size already. If the message is not swallowed, the client easily gets it from the response. # Add code to issue `DescribeConfigsRequest` to retrieve the value. # If splitting failed, lower down the batch size gradually until the split is successful. For example, {code:java} // In RecordAccumulator.java private int steps = 1; .. public int splitAndReenqueue(ProducerBatch bigBatch) { .. Deque dq = bigBatch.split(this.batchSize / steps); if (dq.size() == 1) // s
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew commented on KAFKA-8315: --- [~ableegoldman] Can I sanity check my understanding here. From what I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B wlil be selected and the next record will be 2, not 1. Is that right? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:26 AM: [~ableegoldman] Can I sanity check my understanding here. From what I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? was (Author: the4thamigo_uk): [~ableegoldman] Can I sanity check my understanding here. From what I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B wlil be selected and the next record will be 2, not 1. Is that right? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:27 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? was (Author: the4thamigo_uk): [~ableegoldman] Can I sanity check my understanding here. From what I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:27 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:31 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue}}, then A would be selected, and record 1 would be read first. was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:32 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue}}, then A would be selected, and record 1 would be read first. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:40 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}}+ and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? + for convenience I am quoting these times as second offsets from the start time 190258000ms was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}}*** and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? *** for convenience I am quoting these times as second offsets from the start time 190258000ms > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}}*** and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? *** for convenience I am quoting these times as second offsets from the start time 190258000ms was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} (see +) and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? + for convenience I am quoting these times as second offsets from the start time 190258000ms was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}}+ and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? + for convenience I am quoting these times as second offsets from the start time 190258000ms > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:43 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} (see +++) and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? +++ for convenience I am quoting these times as second offsets from the start time 190258000ms was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} (see +) and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? + for convenience I am quoting these times as second offsets from the start time 190258000ms > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:47 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} (see +++) and the right stream {{partitionTime = 100}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? +++ for convenience I am quoting these times as second offsets from the start time 190258000ms was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} (see +++) and the right stream {{partitionTime = 1000}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? +++ for convenience I am quoting these times as second offsets from the start time 190258000ms > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086 ] Andrew edited comment on KAFKA-8315 at 5/10/19 9:48 AM: [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} (see +++) and the right stream {{partitionTime = 100}}. So, the left stream would be selected first, until all records are consumed up to time 100 in the left stream, then the right stream records would be consumed. In this case, wouldnt many of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? +++ for convenience I am quoting these times as second offsets from the start time 190258000ms was (Author: the4thamigo_uk): [~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B will be selected by {{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that right? If, on the other hand we ordered by the earliest time in each {{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read first. Taking this thought further, in my test data set, if I assume that the {{fifoQueue}} are populated in chunks of 10, then initially the left stream would have {{partitionTime = 10}} (see +++) and the right stream {{partitionTime = 100}}. So, the left stream would be selected first, until all records are consumed in the left stream, then the right stream records would be consumed. In this case, wouldnt the vast majority of the left join windows expire before the first record is read from the {{RecordQueue}} of the right stream? +++ for convenience I am quoting these times as second offsets from the start time 190258000ms > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837118#comment-16837118 ] ASF GitHub Bot commented on KAFKA-8348: --- mjsax commented on pull request #6707: KAFKA-8348 document optimized URL: https://github.com/apache/kafka/pull/6707 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8348: --- Affects Version/s: 1.0.0 1.0.1 1.0.2 1.1.0 1.1.1 2.0.0 2.0.1 2.1.0 2.2.0 2.1.1 > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lifei Chen resolved KAFKA-8348. --- Resolution: Fixed > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8348: --- Fix Version/s: 2.2.2 2.1.2 2.3.0 2.0.2 1.1.2 1.0.3 > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2 > > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837131#comment-16837131 ] Matthias J. Sax commented on KAFKA-8348: [~vahid] I marked this as fixed version 2.2.2 – if we roll a new RC, it should be updated to 2.2.1. > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2 > > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8348: --- Reviewer: (was: Matthias J. Sax) > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2 > > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown
[ https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837209#comment-16837209 ] Eric commented on KAFKA-8313: - Thanks [~guozhang] for the follow up. Is there a place I can grab nightly builds of Kafka? > KafkaStreams state not being updated properly after shutdown > > > Key: KAFKA-8313 > URL: https://issues.apache.org/jira/browse/KAFKA-8313 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 > Environment: Single broker running on Ubuntu Linux. >Reporter: Eric >Assignee: Guozhang Wang >Priority: Minor > Fix For: 2.3.0 > > Attachments: kafka-8313-src.tgz, log.txt > > > I am running a KafkaStreams inside a DropWizard server and I am trying to > detect when my stream shuts down (in case a non-recoverable error occurs). I > was hoping I could use KafkaStreams.setStateListener() to be notified when a > state change occurs. When I query the state, KafkaStreams is stuck in the > REBALANCING state even though its threads are all DEAD. > > You can easily reproduce this by doing the following: > # Create a topic (I have one with 5 partitions) > # Create a simple Kafka stream consuming from that topic > # Create a StateListener and register it on that KafkaStreams > # Start the Kafka stream > # Once everything runs, delete the topic using kafka-topics.sh > When deleting the topic, you will see the StreamThreads' state transition > from RUNNING to PARTITION_REVOKED and you will be notified with the > KafkaStreams REBALANCING state. That's all good and expected. Then the > StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the > KafkaStreams state is stuck into the REBALANCING thread. I was expecting to > see a NOT_RUNNING state eventually... am I right? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837303#comment-16837303 ] John Roesler commented on KAFKA-8315: - Ah, yes. My apologies. I was mistaken about the PartitionGroup logic. I think you're right, and actually, [~ableegoldman] has filed https://issues.apache.org/jira/browse/KAFKA-8347, so I guess she agrees, too. I think it's actually a pretty straightforward change, and I actually don't think it needs a KIP either. Should we try to get this change in for the 15 May feature freeze for 2.3? Since you've taken the time to get familiar with the code, do you want to send a PR, [~the4thamigo_uk]? Offhand, I think we can just redefine RecordQueue.timestamp() to be the head timestamp instead of the high water-mark time. And, of course write/update the relevant tests. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-8187: -- Assignee: Bill Bejeck (was: Matthias J. Sax) > State store record loss across multiple reassignments when using standby tasks > -- > > Key: KAFKA-8187 > URL: https://issues.apache.org/jira/browse/KAFKA-8187 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: William Greer >Assignee: Bill Bejeck >Priority: Major > > Overview: > There is a race condition that can cause a partitioned state store to be > missing records up to an offset when using standby tasks. > When a reassignment occurs and a task is migrated to a StandbyTask in another > StreamThread/TaskManager on the same JVM, there can be lock contention that > prevents the StandbyTask on the currently assigned StreamThread from > acquiring the lock and to not retry acquiring the lock because all of the > active StreamTasks are running for that StreamThread. If the StandbyTask does > not acquire the lock before the StreamThread enters into the RUNNING state, > then the StandbyTask will not consume any records. If there is no subsequent > reassignment before the second execution of the stateDirCleaner Thread, then > the task directory for the StandbyTask will be deleted. When the next > reassignment occurs the offset that was read by the StandbyTask at creation > time before acquiring the lock will be written back to the state store > directory, this re-creates the state store directory. > An example: > StreamThread(A) and StreamThread(B) are running on the same JVM in the same > streams application. > StreamThread(A) has StandbyTask 1_0 > StreamThread(B) has no tasks > A reassignment is triggered by another host in the streams application fleet. > StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads > one task > StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby > task for 1_0 > Here begins the race condition. > StreamThread(B) creates the StandbyTask which reads the current checkpoint > from disk. > StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's > assigned tasks. [0] > StreamThread(B) initializes the new tasks for the active and standby tasks. > [1] [2] > StreamThread(B) attempts to lock the state directory for task 1_0 but fails > with a LockException [3], since StreamThread(A) still holds the lock. > StreamThread(B) returns true from updateNewAndRestoringTasks() due to the > check at [4] which only checks that the active assigned tasks are running. > StreamThread(B) state is set to RUNNING > StreamThread(A) closes the previous StandbyTask specifically calling > closeStateManager() [5] > StreamThread(A) state is set to RUNNING > Streams application for this host has completed re-balancing and is now in > the RUNNING state. > State at this point is the following: State directory exists for 1_0 and all > data is present. > Then at a period that is 1 to 2 intervals of [6](which is default of 10 > minutes) after the reassignment had completed the stateDirCleaner thread will > execute [7]. > The stateDirCleaner will then do [8], which finds the directory 1_0, finds > that there isn't an active lock for that directory, acquire the lock, and > deletes the directory. > State at this point is the following: State directory does not exist for 1_0. > When the next reassignment occurs. The offset that was read by > StreamThread(B) during construction of the StandbyTask for 1_0 will be > written back to disk. This write re-creates the state store directory and > writes the .checkpoint file with the old offset. > State at this point is the following: State directory exists for 1_0 with a > '.checkpoint' file in it, but there is no other state store data in the > directory. > If this host is assigned the active task for 1_0 then all the history in the > state store will be missing from before the offset that was read at the > previous reassignment. > If this host is assigned the standby task for 1_0 then the lock will be > acquired and the standby will start to consume records, but it will still be > missing all records from before the offset that was read at the previous > reassignment. > If this host is not assigned 1_0, then the state directory will get cleaned > up by the stateDirCleaner thread 10 to 20 minutes later and the record loss > issue will be hidden. > [0] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869 > [1] > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#
[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837314#comment-16837314 ] Vahid Hashemian commented on KAFKA-8348: Sounds good [~mjsax]. > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2 > > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837348#comment-16837348 ] John Roesler commented on KAFKA-7895: - FYI, 2.2.1 RC0 vote is in progress. (https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E) Please feel free to test it out, and reply on the vote thread if you have some trouble with it. > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.2.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Blocker > Fix For: 2.1.2, 2.2.1 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8254) Suppress incorrectly passes a null topic to the serdes
[ https://issues.apache.org/jira/browse/KAFKA-8254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837350#comment-16837350 ] John Roesler commented on KAFKA-8254: - FYI, 2.2.1 RC0 vote is in progress. (https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E) Please feel free to test it out, and reply on the vote thread if you have some trouble with it. > Suppress incorrectly passes a null topic to the serdes > -- > > Key: KAFKA-8254 > URL: https://issues.apache.org/jira/browse/KAFKA-8254 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.2.0, 2.1.1 >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > For example, in KTableSuppressProcessor, we do: > {noformat} > final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, > key)); > {noformat} > This violates the contract of Serializer (and Deserializer), and breaks > integration with known Serde implementations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8289) KTable, Long> can't be suppressed
[ https://issues.apache.org/jira/browse/KAFKA-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837351#comment-16837351 ] John Roesler commented on KAFKA-8289: - FYI, 2.2.1 RC0 vote is in progress. (https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E) Please feel free to test it out, and reply on the vote thread if you have some trouble with it. > KTable, Long> can't be suppressed > --- > > Key: KAFKA-8289 > URL: https://issues.apache.org/jira/browse/KAFKA-8289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.2.0, 2.1.1 > Environment: Broker on a Linux, stream app on my win10 laptop. > I add one row log.message.timestamp.type=LogAppendTime to my broker's > server.properties. stream app all default config. >Reporter: Xiaolin Jia >Assignee: John Roesler >Priority: Blocker > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > I write a simple stream app followed official developer guide [Stream > DSL|[https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#window-final-results]]. > but I got more than one [Window Final > Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31] > from a session time window. > time ticker A -> (4,A) / 25s, > time ticker B -> (4, B) / 25s all send to the same topic > below is my stream app code > {code:java} > kstreams[0] > .peek((k, v) -> log.info("--> ping, k={},v={}", k, v)) > .groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String())) > .windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20))) > .count() > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) > .toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), > k.key(), v)); > {code} > {{here is my log print}} > {noformat} > 2019-04-24 20:00:26.142 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:00:47.070 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556106587744, > endMs=1556107129191},k=A,v=20 > 2019-04-24 20:00:51.071 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:01:16.065 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:01:41.066 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:02:06.069 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:02:31.066 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:02:56.208 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:03:21.070 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:03:46.078 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:04:04.684 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:04:11.069 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:04:19.371 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107426409},k=B,v=9 > 2019-04-24 20:04:19.372 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107445012},k=A,v=1 > 2019-04-24 20:04:29.604 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:04:36.067 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:04:49.715 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=1556107451397},k=B,v=10 > 2019-04-24 20:04:49.716 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107445012, > endMs=1556107469935},k=A,v=2 > 2019-04-24 20:04:54.593 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:05:01.070 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=B > 2019-04-24 20:05:19.599 INFO --- [-StreamThread-1] c.g.k.AppStreams > : --> ping, k=4,v=A > 2019-04-24 20:05:20.045 INFO --- [-StreamThread-1] c.g.k.AppStreams > : window=Window{startMs=1556107226473, > endMs=155610747639
[jira] [Commented] (KAFKA-8204) Streams may flush state stores in the incorrect order
[ https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837349#comment-16837349 ] John Roesler commented on KAFKA-8204: - FYI, 2.2.1 RC0 vote is in progress. (https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E) Please feel free to test it out, and reply on the vote thread if you have some trouble with it. > Streams may flush state stores in the incorrect order > - > > Key: KAFKA-8204 > URL: https://issues.apache.org/jira/browse/KAFKA-8204 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.3.0, 2.2.1 > > > Cached state stores may forward records during a flush call, so Streams > should flush the stores in topological order. Otherwise, Streams may flush a > downstream store before an upstream one, resulting in sink results being > committed without the corresponding state changelog updates being committed. > This behavior is partly responsible for the bug reported in KAFKA-7895 . > The fix is simply to flush the stores in topological order, then when the > upstream store forwards records to a downstream stateful processor, the > corresponding state changes will be correctly flushed as well. > An alternative would be to repeatedly call flush on all state stores until > they report there is nothing left to flush, but this requires a public API > change to enable state stores to report whether they need a flush or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8351) Log cleaner must handle transactions spanning multiple segments
Jason Gustafson created KAFKA-8351: -- Summary: Log cleaner must handle transactions spanning multiple segments Key: KAFKA-8351 URL: https://issues.apache.org/jira/browse/KAFKA-8351 Project: Kafka Issue Type: Bug Components: log cleaner Reporter: Jason Gustafson Assignee: Jason Gustafson When cleaning transactions, we have to do some bookkeeping to keep track of which transactions still have data left around. As long as there is still data, we cannot remove the transaction marker. The problem is that we do this tracking at the segment level. We do not carry over the ongoing transaction state between segments. So if the first entry in a segment is a marker, we incorrectly clean it. In the worst case, data from a committed transaction could become aborted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7858) Replace JoinGroup request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-7858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-7858. Resolution: Fixed > Replace JoinGroup request/response with automated protocol > -- > > Key: KAFKA-7858 > URL: https://issues.apache.org/jira/browse/KAFKA-7858 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8286) KIP-460 Admin Leader Election RPC
[ https://issues.apache.org/jira/browse/KAFKA-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-8286: -- Description: Tracking issue for implementing KIP-460. Tasks: # [Done] Design KIP # [Done] Review KIP # [Done] Approve KIP # [Done] Update RPC to support KIP # [Done] Update controller to support KIP # [Done] Create CLI command (kafka-leader-election) that implement KIP # [Done] Search and replace any usage of “preferred” in the code # Add test for command # [Done] Add test for controller functionality # Revisit all of the documentation - generate and audit the new javadocs # Deprecated since... needs to be update # Review PR # Merge PR was: Tracking issue for implementing KIP-460. Tasks: # [Done] Design KIP # [Done] Review KIP # [Done] Approve KIP # [Done] Update RPC to support KIP # [Done] Update controller to support KIP # [Done] Create CLI command (kafka-leader-election) that implement KIP # [Done] Search and replace any usage of “preferred” in the code # Add test for command # Add test for controller functionality # Revisit all of the documentation - generate and audit the new javadocs # Deprecated since... needs to be update # Review PR # Merge PR > KIP-460 Admin Leader Election RPC > - > > Key: KAFKA-8286 > URL: https://issues.apache.org/jira/browse/KAFKA-8286 > Project: Kafka > Issue Type: New Feature > Components: admin, clients, core >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Tracking issue for implementing KIP-460. Tasks: > # [Done] Design KIP > # [Done] Review KIP > # [Done] Approve KIP > # [Done] Update RPC to support KIP > # [Done] Update controller to support KIP > # [Done] Create CLI command (kafka-leader-election) that implement KIP > # [Done] Search and replace any usage of “preferred” in the code > # Add test for command > # [Done] Add test for controller functionality > # Revisit all of the documentation - generate and audit the new javadocs > # Deprecated since... needs to be update > # Review PR > # Merge PR -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8247) Duplicate error handling in kafka-server-start.sh and actual Kafka class
[ https://issues.apache.org/jira/browse/KAFKA-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837510#comment-16837510 ] Sönke Liebau commented on KAFKA-8247: - I've experimented a bit with this, I removed all those checks and diff'ed the output of all shell scripts when ran without parameters. See the results below, the number at the very end is the return code, but this stays the same in all instances. There is one additional script that differs: kafka-consumer-perf-test.sh - but this is only because it has a random element in the output that changes every run (how useful that is is another debate). Overall this looks good to me, in some cases the output is even signicifantly better, because we are not killing uage reporting from the java class anymore. The output from the two connect classes might be improved a little I guess.. zookeeper-shell is a bit of a special case, as we change behavior by removing this check. Currently it returns with a help message, without that check it would default to connecting to localhost (behavior that I actually would prefer). If anybody has this call in a script somewhere this would hang post-change, as it never returns. I'm not sure why someone would have that though, save for testing purposes.. So overall I think we'd be good to make the change, unsure about Zookeeper shell and the -daemon flag, anybody have an opinion on that? Ping [~hachikuji] since you noticed this duplicate check as well. {noformat} == ./connect-distributed.sh.out == Old output: USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties 0 - New output: [2019-05-10 09:53:16,310] INFO Usage: ConnectDistributed worker.properties (org.apache.kafka.connect.cli.ConnectDistributed:64) 0 == ./connect-standalone.sh.out == Old output: USAGE: ./connect-standalone.sh [-daemon] connect-standalone.properties 0 - New output: [2019-05-10 09:53:18,139] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:62) 0 == ./kafka-run-class.sh.out == Old output: USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts] 0 - New output: Usage: java [-options] class [args...] (to execute a class) or java [-options] -jar jarfile [args...] (to execute a jar file) where options include: -d32 use a 32-bit data model if available -d64 use a 64-bit data model if available -server to select the "server" VM The default VM is server, because you are running on a server-class machine. -cp -classpath A : separated list of directories, JAR archives, and ZIP archives to search for class files. -D= set a system property -verbose:[class|gc|jni] enable verbose output -version print product version and exit -version: Warning: this feature is deprecated and will be removed in a future release. require the specified version to run -showversion print product version and continue -jre-restrict-search | -no-jre-restrict-search Warning: this feature is deprecated and will be removed in a future release. include/exclude user private JREs in the version search -? -help print this help message -Xprint help on non-standard options -ea[:...|:] -enableassertions[:...|:] enable assertions with specified granularity -da[:...|:] -disableassertions[:...|:] disable assertions with specified granularity -esa | -enablesystemassertions enable system assertions -dsa | -disablesystemassertions disable system assertions -agentlib:[=] load native agent library , e.g. -agentlib:hprof see also, -agentlib:jdwp=help and -agentlib:hprof=help -agentpath:[=] load native agent library by full pathname -javaagent:[=] load Java programming language agent, see java.lang.instrument -splash: show splash screen with specified image See http://www.oracle.com/technetwork/java/javase/documentation/index.html for more details. 0 == ./kafka-server-start.sh.out == Old output: USAGE: ./kafka-server-start.sh [-daemon] server.properties [--override property=value]* 0 - New output: [2019-05-10 09:53:58,855] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jCont
[jira] [Resolved] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition
[ https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tdp resolved KAFKA-8339. Resolution: Not A Problem Marking as resolved. This is working as expected from the streams side and the bug is in user-specific aggregation logic. > At-least-once delivery guarantee seemingly not met due to async commit / > produce failure race condition > --- > > Key: KAFKA-8339 > URL: https://issues.apache.org/jira/browse/KAFKA-8339 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: tdp >Priority: Major > > We have hit a race condition several times now between the StreamThread > committing its offsets for a task before the task has fully processed the > record through the topology. > > Consider part of a topology that looks like this: > > TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > > KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC > T2 > > Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these > records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records > using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not > all necessary records from topic T1 have been consumed yet or an object > representing an aggregation of records if all necessary records from topic T1 > have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is > null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 > node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into > another object type. KSTREAM-SINK-NODE1 then attempts to produce this other > object to topic T2. > > The race condition occurs when the stream thread commits its offsets for > topic T1 after it consumes some or all of the necessary records from topic T1 > for an aggregation but before it gets the failure response back from the > async produce kicked off by KSTREAM-SINK-NODE1. > > We are running with a LogAndFailExceptionHandler, so when the stream thread > tries to commit the next time it fails and the stream thread shuts itself > down. The stream task is then reassigned to another stream thread, which > reads the offsets previously committed by the original stream thread. That > means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to > consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 > will never end up producing the required records to topic T2. This is why it > seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 > never successfully processed records and the stream application continued on > past it. > Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which > increases the likelihood of occurrence of the issue when all retries fail > since it widens the window at which the async offset commit can occur before > the produce record request is marked as failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-8348: --- Fix Version/s: (was: 2.2.2) 2.2.1 > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.1 > > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8352) Connect System Tests are failing with 404
Magesh kumar Nandakumar created KAFKA-8352: -- Summary: Connect System Tests are failing with 404 Key: KAFKA-8352 URL: https://issues.apache.org/jira/browse/KAFKA-8352 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.0 Reporter: Magesh kumar Nandakumar Assignee: Magesh kumar Nandakumar Fix For: 2.3.0 [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest Server was started. As a result, the connect system tests are failing with a 404 error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8352) Connect System Tests are failing with 404
[ https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837590#comment-16837590 ] ASF GitHub Bot commented on KAFKA-8352: --- mageshn commented on pull request #6713: KAFKA-8352 : Fix Connect System test failure 404 Not Found URL: https://github.com/apache/kafka/pull/6713 Retry when there is a 404 error for connect service. This could happen when the connect is first started and the resources are not initialized. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect System Tests are failing with 404 > - > > Key: KAFKA-8352 > URL: https://issues.apache.org/jira/browse/KAFKA-8352 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.3.0 > > > [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest > Server was started. As a result, the connect system tests are failing with a > 404 error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8265) Connect Client Config Override policy
[ https://issues.apache.org/jira/browse/KAFKA-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Magesh kumar Nandakumar updated KAFKA-8265: --- Affects Version/s: 2.3.0 Priority: Major (was: Minor) Fix Version/s: 2.3.0 > Connect Client Config Override policy > - > > Key: KAFKA-8265 > URL: https://issues.apache.org/jira/browse/KAFKA-8265 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Major > Fix For: 2.3.0 > > > Right now, each source connector and sink connector inherit their client > configurations from the worker properties. Within the worker properties, all > configurations that have a prefix of "producer." or "consumer." are applied > to all source connectors and sink connectors respectively. > We should allow the "producer." or "consumer." to be overridden in > accordance to an override policy determined by the administrator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Gune updated KAFKA-8346: -- Description: The replica fetcher thread terminates in case one of the partitions being monitored fails. This leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continuing with the rest of the partitions. (was: The replica fetcher thread terminates in case one of the partitions being monitors fails. It leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continue handling rest of the partitions.) > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8050) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8050: --- Fix Version/s: (was: 2.2.2) > remove KafkaMbean when network close > > > Key: KAFKA-8050 > URL: https://issues.apache.org/jira/browse/KAFKA-8050 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8048) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8048: --- Fix Version/s: (was: 2.2.2) > remove KafkaMbean when network close > > > Key: KAFKA-8048 > URL: https://issues.apache.org/jira/browse/KAFKA-8048 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8240) Source.equals() can fail with NPE
[ https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837611#comment-16837611 ] Matthias J. Sax commented on KAFKA-8240: [~vahid] Are you rolling a new RC? You update the other ticket but not this one? > Source.equals() can fail with NPE > - > > Key: KAFKA-8240 > URL: https://issues.apache.org/jira/browse/KAFKA-8240 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: beginner, easy-fix, newbie > Fix For: 2.3.0, 2.1.2, 2.2.2 > > > Reported on an PR: > [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795] > InternalTopologyBuilder#Source.equals() might fail with NPE if > `topicPattern==null`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8051) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8051: --- Fix Version/s: (was: 2.2.2) > remove KafkaMbean when network close > > > Key: KAFKA-8051 > URL: https://issues.apache.org/jira/browse/KAFKA-8051 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8049) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8049: --- Fix Version/s: (was: 2.2.2) > remove KafkaMbean when network close > > > Key: KAFKA-8049 > URL: https://issues.apache.org/jira/browse/KAFKA-8049 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7978) Flaky Test SaslSslAdminClientIntegrationTest#testConsumerGroups
[ https://issues.apache.org/jira/browse/KAFKA-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7978: --- Fix Version/s: (was: 2.2.2) (was: 2.3.0) > Flaky Test SaslSslAdminClientIntegrationTest#testConsumerGroups > --- > > Key: KAFKA-7978 > URL: https://issues.apache.org/jira/browse/KAFKA-7978 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/25/] > {quote}java.lang.AssertionError: expected:<2> but was:<0> at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:645) at > org.junit.Assert.assertEquals(Assert.java:631) at > kafka.api.AdminClientIntegrationTest.testConsumerGroups(AdminClientIntegrationTest.scala:1157) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7946) Flaky Test DeleteConsumerGroupsTest#testDeleteNonEmptyGroup
[ https://issues.apache.org/jira/browse/KAFKA-7946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7946: --- Fix Version/s: (was: 2.2.2) > Flaky Test DeleteConsumerGroupsTest#testDeleteNonEmptyGroup > --- > > Key: KAFKA-7946 > URL: https://issues.apache.org/jira/browse/KAFKA-7946 > Project: Kafka > Issue Type: Bug > Components: admin, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Assignee: Gwen Shapira >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/17/] > {quote}java.lang.NullPointerException at > kafka.admin.DeleteConsumerGroupsTest.testDeleteNonEmptyGroup(DeleteConsumerGroupsTest.scala:96){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6789) Add retry logic in AdminClient requests
[ https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837616#comment-16837616 ] Matthias J. Sax commented on KAFKA-6789: [~vahid] Does this need a fix version update from 2.2.2 to 2.2.1 ? > Add retry logic in AdminClient requests > --- > > Key: KAFKA-6789 > URL: https://issues.apache.org/jira/browse/KAFKA-6789 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Manikumar >Priority: Major > Fix For: 2.0.2, 2.1.2, 2.2.2 > > > In KafkaAdminClient, today we treat all error codes as fatal and set the > exception accordingly in the returned futures. But for some error codes they > can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We > could consider adding the retry logic internally in the admin client so that > users would not need to retry themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8352) Connect System Tests are failing with 404
[ https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-8352: - Affects Version/s: 2.2.1 2.1.2 2.0.2 > Connect System Tests are failing with 404 > - > > Key: KAFKA-8352 > URL: https://issues.apache.org/jira/browse/KAFKA-8352 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.3.0 > > > [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest > Server was started. As a result, the connect system tests are failing with a > 404 error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8352) Connect System Tests are failing with 404
[ https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-8352: - Fix Version/s: 2.2.1 2.1.2 2.0.2 > Connect System Tests are failing with 404 > - > > Key: KAFKA-8352 > URL: https://issues.apache.org/jira/browse/KAFKA-8352 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1 > > > [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest > Server was started. As a result, the connect system tests are failing with a > 404 error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8240) Source.equals() can fail with NPE
[ https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837618#comment-16837618 ] Vahid Hashemian commented on KAFKA-8240: [~mjsax] Sorry I missed it. Updated. Feel free to update any other that should also be included in RC1. Thanks. > Source.equals() can fail with NPE > - > > Key: KAFKA-8240 > URL: https://issues.apache.org/jira/browse/KAFKA-8240 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: beginner, easy-fix, newbie > Fix For: 2.3.0, 2.1.2, 2.2.2 > > > Reported on an PR: > [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795] > InternalTopologyBuilder#Source.equals() might fail with NPE if > `topicPattern==null`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8240) Source.equals() can fail with NPE
[ https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-8240: --- Fix Version/s: (was: 2.2.2) 2.2.1 > Source.equals() can fail with NPE > - > > Key: KAFKA-8240 > URL: https://issues.apache.org/jira/browse/KAFKA-8240 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: beginner, easy-fix, newbie > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > Reported on an PR: > [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795] > InternalTopologyBuilder#Source.equals() might fail with NPE if > `topicPattern==null`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6789) Add retry logic in AdminClient requests
[ https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-6789: --- Fix Version/s: (was: 2.2.2) 2.2.1 > Add retry logic in AdminClient requests > --- > > Key: KAFKA-6789 > URL: https://issues.apache.org/jira/browse/KAFKA-6789 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Manikumar >Priority: Major > Fix For: 2.0.2, 2.1.2, 2.2.1 > > > In KafkaAdminClient, today we treat all error codes as fatal and set the > exception accordingly in the returned futures. But for some error codes they > can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We > could consider adding the retry logic internally in the admin client so that > users would not need to retry themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6789) Add retry logic in AdminClient requests
[ https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837620#comment-16837620 ] Vahid Hashemian commented on KAFKA-6789: Updated. > Add retry logic in AdminClient requests > --- > > Key: KAFKA-6789 > URL: https://issues.apache.org/jira/browse/KAFKA-6789 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Manikumar >Priority: Major > Fix For: 2.0.2, 2.1.2, 2.2.1 > > > In KafkaAdminClient, today we treat all error codes as fatal and set the > exception accordingly in the returned futures. But for some error codes they > can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We > could consider adding the retry logic internally in the admin client so that > users would not need to retry themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8305) AdminClient should support creating topics with default partitions and replication factor
[ https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837631#comment-16837631 ] Almog Gavra commented on KAFKA-8305: Approved [https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic] > AdminClient should support creating topics with default partitions and > replication factor > - > > Key: KAFKA-8305 > URL: https://issues.apache.org/jira/browse/KAFKA-8305 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Almog Gavra >Priority: Major > > Today, the AdminClient creates topics by requiring a `NewTopic` object, which > must contain either partitions and replicas or an exact broker mapping (which > then infers partitions and replicas). Some users, however, could benefit from > just using the cluster default for replication factor but may not want to use > auto topic creation. > NOTE: I am planning on working on this, but I do not have permissions to > assign this ticket to myself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8305) AdminClient should support creating topics with default partitions and replication factor
[ https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Almog Gavra updated KAFKA-8305: --- Summary: AdminClient should support creating topics with default partitions and replication factor (was: AdminClient should support creating topics with `default.replication.factor`) > AdminClient should support creating topics with default partitions and > replication factor > - > > Key: KAFKA-8305 > URL: https://issues.apache.org/jira/browse/KAFKA-8305 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Almog Gavra >Priority: Major > > Today, the AdminClient creates topics by requiring a `NewTopic` object, which > must contain either partitions and replicas or an exact broker mapping (which > then infers partitions and replicas). Some users, however, could benefit from > just using the cluster default for replication factor but may not want to use > auto topic creation. > NOTE: I am planning on working on this, but I do not have permissions to > assign this ticket to myself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8353) org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms after enabling SASL PLAINTEXT authentication
goutham created KAFKA-8353: -- Summary: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 6 ms after enabling SASL PLAINTEXT authentication Key: KAFKA-8353 URL: https://issues.apache.org/jira/browse/KAFKA-8353 Project: Kafka Issue Type: Bug Components: documentation, security Affects Versions: 0.10.2.1 Reporter: goutham I'm running into time out exception when i try to run producer and consumer through java or console. *kafka server.properties* [advertised.host.name|http://advertised.host.name/]=127.0.0.1 listeners=SASL_PLAINTEXT://[127.0.0.1:9090|http://127.0.0.1:9090/] security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN advertised.listeners=SASL_PLAINTEXT://[127.0.0.1:9090|http://127.0.0.1:9090/] *kafka server jass conf* KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_test="test"; }; *client producer/consumer properties* String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, "test", "test"); brokers.delete(brokers.length() - 1, brokers.length()); properties.put("bootstrap.servers", brokers.toString()); properties.put("[retry.backoff.ms|http://retry.backoff.ms/]";, "1000"); properties.put("[reconnect.backoff.ms|http://reconnect.backoff.ms/]";, "1000"); properties.put("max.request.size", "5242880"); properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); // properties.put("[metadata.max.age.ms|http://metadata.max.age.ms/]";, 15000); //Refresh topic partition leadership every 15 seconds properties.put("sasl.jaas.config", jaasCfg); properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("ssl.client.auth", "none"); Also added env variable for KAKFA_OPTS with jass config location so console consumer can use that login module.i am running single node kafka (0.10.2) with zookeeper (3.4.9). with these setting both broker and zookeeper comes up.But clients with valid credential not able to write/read from the broker.pretty much used steps in documentation from apache Kafka.Pls advice? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8221) Augment LeaveGroupRequest to batch operation
[ https://issues.apache.org/jira/browse/KAFKA-8221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837646#comment-16837646 ] ASF GitHub Bot commented on KAFKA-8221: --- abbccdda commented on pull request #6714: KAFKA-8221 & KIP-345 part-3: Add batch leave group request URL: https://github.com/apache/kafka/pull/6714 We are aiming to support batch leave group request issued from admin client. This diff is the first effort to bump leave group request version. Note that we relax the state check on broker side for restricting a request with both `member.id` and `group.instance.id` set. Although it's not possible at the moment, we don't want to restrict future development to potentially handle leave group from static member too. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Augment LeaveGroupRequest to batch operation > > > Key: KAFKA-8221 > URL: https://issues.apache.org/jira/browse/KAFKA-8221 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Having a batch leave group request is a required protocol change to remove a > set of static members all at once. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837653#comment-16837653 ] Guozhang Wang commented on KAFKA-8335: -- [~boquan] [~weichu] Thanks for reporting this issue. Jason and I have talked about this issue and Jason has proposed a solution which we'd try to get in to the upcoming 2.3.0 release. > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Assignee: Jason Gustafson >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8354) Replace SyncGroup request/response with automated protocol
Boyang Chen created KAFKA-8354: -- Summary: Replace SyncGroup request/response with automated protocol Key: KAFKA-8354 URL: https://issues.apache.org/jira/browse/KAFKA-8354 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8355) Add static membership to Range assignor
Boyang Chen created KAFKA-8355: -- Summary: Add static membership to Range assignor Key: KAFKA-8355 URL: https://issues.apache.org/jira/browse/KAFKA-8355 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837664#comment-16837664 ] ASF GitHub Bot commented on KAFKA-8335: --- hachikuji commented on pull request #6715: KAFKA-8335; Clean empty batches when sequence numbers are reused URL: https://github.com/apache/kafka/pull/6715 The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this: 1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined. 2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number. The complete fix for the second issue is probably to do the proper sequence number bookkeeping in the coordinator. For now, I have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Assignee: Jason Gustafson >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8356) Add static membership to Round Robin assignor
Boyang Chen created KAFKA-8356: -- Summary: Add static membership to Round Robin assignor Key: KAFKA-8356 URL: https://issues.apache.org/jira/browse/KAFKA-8356 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8224) Add static member id into Subscription Info for better rebalance behavior
[ https://issues.apache.org/jira/browse/KAFKA-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-8224: --- Issue Type: Improvement (was: Sub-task) Parent: (was: KAFKA-7018) > Add static member id into Subscription Info for better rebalance behavior > - > > Key: KAFKA-8224 > URL: https://issues.apache.org/jira/browse/KAFKA-8224 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Based on discussion in [https://github.com/apache/kafka/pull/6177] and > KIP-345, we plan to better utilize static member info to make wise rebalance > call, such as assignors like Range or Round Robin could become more sticky to > rely on static ids instead of coordinator auto-generated ids. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Gune updated KAFKA-8346: -- External issue URL: https://github.com/apache/kafka/pull/6716 > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Gune updated KAFKA-8346: -- Description: The replica fetcher thread terminates in case one of the partitions being monitored fails. This leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continuing with the rest of the partitions. KIP-461: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] was:The replica fetcher thread terminates in case one of the partitions being monitored fails. This leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continuing with the rest of the partitions. > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. > KIP-461: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Gune updated KAFKA-8346: -- External issue URL: (was: https://github.com/apache/kafka/pull/6716) > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. > KIP-461: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8352) Connect System Tests are failing with 404
[ https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837682#comment-16837682 ] ASF GitHub Bot commented on KAFKA-8352: --- rhauch commented on pull request #6713: KAFKA-8352 : Fix Connect System test failure 404 Not Found URL: https://github.com/apache/kafka/pull/6713 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect System Tests are failing with 404 > - > > Key: KAFKA-8352 > URL: https://issues.apache.org/jira/browse/KAFKA-8352 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1 > > > [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest > Server was started. As a result, the connect system tests are failing with a > 404 error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8352) Connect System Tests are failing with 404
[ https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-8352: - Reviewer: Randall Hauch > Connect System Tests are failing with 404 > - > > Key: KAFKA-8352 > URL: https://issues.apache.org/jira/browse/KAFKA-8352 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1 > > > [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest > Server was started. As a result, the connect system tests are failing with a > 404 error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8352) Connect System Tests are failing with 404
[ https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8352. -- Resolution: Fixed Thanks, [~mageshn]! > Connect System Tests are failing with 404 > - > > Key: KAFKA-8352 > URL: https://issues.apache.org/jira/browse/KAFKA-8352 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1 > > > [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest > Server was started. As a result, the connect system tests are failing with a > 404 error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Gune reassigned KAFKA-8346: - Assignee: Aishwarya Gune > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Assignee: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. > KIP-461: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Gune updated KAFKA-8346: -- Description: The replica fetcher thread terminates in case one of the partitions being monitored fails. This leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continuing with the rest of the partitions. KIP-461: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] PR draft: [https://github.com/apache/kafka/pull/6716] was: The replica fetcher thread terminates in case one of the partitions being monitored fails. This leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continuing with the rest of the partitions. KIP-461: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Assignee: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. > KIP-461: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] > PR draft: [https://github.com/apache/kafka/pull/6716] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aishwarya Gune updated KAFKA-8346: -- Description: The replica fetcher thread terminates in case one of the partitions being monitored fails. This leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continuing with the rest of the partitions. KIP-461: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] was: The replica fetcher thread terminates in case one of the partitions being monitored fails. This leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continuing with the rest of the partitions. KIP-461: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] PR draft: [https://github.com/apache/kafka/pull/6716] > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Assignee: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. > KIP-461: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7206) Enable batching in FindCoordinator
[ https://issues.apache.org/jira/browse/KAFKA-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837729#comment-16837729 ] Guozhang Wang commented on KAFKA-7206: -- [~sagarrao] I think [~shung] has tried to implemented this idea before but it turns out to be quite complex (maybe Yishun can give you some pointers of his past contributions). If this is your first work in the codebase that maybe very ambitious. Anyways, it is up to you and Yishun. > Enable batching in FindCoordinator > -- > > Key: KAFKA-7206 > URL: https://issues.apache.org/jira/browse/KAFKA-7206 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Yishun Guan >Assignee: Yishun Guan >Priority: Critical > Labels: needs-discussion, needs-kip, newbie++ > > To quote [~guozhang] : > "The proposal is that, we extend FindCoordinatorRequest to have multiple > consumer ids: today each FindCoordinatorRequest only contains a single > consumer id, so in our scenario we need to send N request for N consumer > groups still. If we can request for coordinators in a single request, then > the workflow could be simplified to: > # send a single FindCoordinatorRequest to a broker asking for coordinators > of all consumer groups. > 1.a) note that the response may still succeed in finding some coordinators > while error on others, and we need to handle them on that granularity (see > below). > # and then for the collected coordinator, group them by coordinator id and > send one request per coordinator destination. > Note that this change would require the version to be bumped up, to > FIND_COORDINATOR_REQUEST_V3 for such protocol changes, also the RESPONSE > version should be bumped up in order to include multiple coordinators." > A KIP is needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8347) Choose next record to process by timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837764#comment-16837764 ] ASF GitHub Bot commented on KAFKA-8347: --- ableegoldman commented on pull request #6719: KAFKA-8347: Choose next record to process by timestamp URL: https://github.com/apache/kafka/pull/6719 When choosing the next record to process, we should look at the head record's timestamp of each partition and choose the lowest rather than choosing the lowest of the partition's streamtime. This change effectively makes RecordQueue return the timestamp of the head record rather than its streamtime. Streamtime is removed (replaced) from RecordQueue as it was only being tracked in order to choose the next partition to poll from. Will add some unit tests soon This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Choose next record to process by timestamp > -- > > Key: KAFKA-8347 > URL: https://issues.apache.org/jira/browse/KAFKA-8347 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > Currently PartitionGroup will determine the next record to process by > choosing the partition with the lowest stream time. However if a partition > contains out of order data its stream time may be significantly larger than > the timestamp of the next record. The next record should instead be chosen as > the record with the lowest timestamp across all partitions, regardless of > which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837765#comment-16837765 ] Sophie Blee-Goldman commented on KAFKA-8315: [~the4thamigo_uk] After I directed you to check out RecordQueue I went back to look over it and filed the ticket John linked: you're right, it actually was going by partition time rather than by the timestamp of the head record. I've opened a simple PR with the fix [here|[https://github.com/apache/kafka/pull/6719]] That said, I'm not sure this actually affects your use case. If all the data is in order, partition time should be the same as the head record's timestamp, so this should only come into play when processing out of order data. In your example above, partition A would have streamtime = 1 when first choosing the next record, as it will not have seen the record with timestamp 4 yet. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference
[ https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8332. Resolution: Fixed Assignee: Bob Barrett (was: Konstantine Karantasis) > Regression in handling of JoinGroupRequest disallows deterministic protocol > selection based on order of preference > -- > > Key: KAFKA-8332 > URL: https://issues.apache.org/jira/browse/KAFKA-8332 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Konstantine Karantasis >Assignee: Bob Barrett >Priority: Blocker > Fix For: 2.3.0 > > > When a group of Kafka clients includes more than one embedded protocol in its > {{JoinGroupRequest}} along with its metadata, the group membership protocol > defines that the protocol which is supported by all the members of a group is > selected, and if more than one protocols are supported by all the members the > protocol is selected based on the order of preference as defined in the > {{JoinGroupRequest}}. > A recent change from type {{List}} to type {{Set}} for storing the set of > supported embedded protocols in the {{JoinGroupRequest}} combined with the > old type of handling with implicit types in the scala code, has introduced > non-determinism in the selection of the embedded protocol by the > {{GroupCoordinator}}, even though the underlying type of the Set in use is a > variant of LinkedHashSet (it respects order). > The relevant code is: > {code:java} > // KafkaApis.scala > val protocols = joinGroupRequest.data().protocols().asScala.map(protocol => > (protocol.name, protocol.metadata)).toList > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference
[ https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837770#comment-16837770 ] ASF GitHub Bot commented on KAFKA-8332: --- hachikuji commented on pull request #6695: KAFKA-8332: Restore determinism in protocol selection during JoinGroupRequest handling URL: https://github.com/apache/kafka/pull/6695 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Regression in handling of JoinGroupRequest disallows deterministic protocol > selection based on order of preference > -- > > Key: KAFKA-8332 > URL: https://issues.apache.org/jira/browse/KAFKA-8332 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Konstantine Karantasis >Assignee: Bob Barrett >Priority: Blocker > Fix For: 2.3.0 > > > When a group of Kafka clients includes more than one embedded protocol in its > {{JoinGroupRequest}} along with its metadata, the group membership protocol > defines that the protocol which is supported by all the members of a group is > selected, and if more than one protocols are supported by all the members the > protocol is selected based on the order of preference as defined in the > {{JoinGroupRequest}}. > A recent change from type {{List}} to type {{Set}} for storing the set of > supported embedded protocols in the {{JoinGroupRequest}} combined with the > old type of handling with implicit types in the scala code, has introduced > non-determinism in the selection of the embedded protocol by the > {{GroupCoordinator}}, even though the underlying type of the Set in use is a > variant of LinkedHashSet (it respects order). > The relevant code is: > {code:java} > // KafkaApis.scala > val protocols = joinGroupRequest.data().protocols().asScala.map(protocol => > (protocol.name, protocol.metadata)).toList > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline
[ https://issues.apache.org/jira/browse/KAFKA-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837772#comment-16837772 ] ASF GitHub Bot commented on KAFKA-8171: --- hachikuji commented on pull request #6515: KAFKA-8171: Set callback to null in addStopReplicaRequestForBrokers when replica … URL: https://github.com/apache/kafka/pull/6515 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > callback needs to be null when addStopReplicaRequestForBrokers when replica > state transits to offline > - > > Key: KAFKA-8171 > URL: https://issues.apache.org/jira/browse/KAFKA-8171 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: kehu >Priority: Major > > Problem: > In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, > it will try to group the requests based on deletePartition flag and callback: > val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => > !r.deletePartition && r.callback == null) > When both conditions meet, controller is expected to send only one request to > destination broker. However, when adding the requests in ReplicaStateMachine, > it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is > always empty and controller will always first sends an empty request followed > by #partitions requests. > > Fix: set the callback to null in addStopReplicaRequestForBrokers when replica > state transits to offline. PR has been created: > https://github.com/apache/kafka/pull/6515 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline
[ https://issues.apache.org/jira/browse/KAFKA-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8171. Resolution: Fixed This was fixed by KAFKA-8237. > callback needs to be null when addStopReplicaRequestForBrokers when replica > state transits to offline > - > > Key: KAFKA-8171 > URL: https://issues.apache.org/jira/browse/KAFKA-8171 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: kehu >Priority: Major > > Problem: > In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, > it will try to group the requests based on deletePartition flag and callback: > val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => > !r.deletePartition && r.callback == null) > When both conditions meet, controller is expected to send only one request to > destination broker. However, when adding the requests in ReplicaStateMachine, > it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is > always empty and controller will always first sends an empty request followed > by #partitions requests. > > Fix: set the callback to null in addStopReplicaRequestForBrokers when replica > state transits to offline. PR has been created: > https://github.com/apache/kafka/pull/6515 -- This message was sent by Atlassian JIRA (v7.6.3#76005)