[jira] [Created] (KAFKA-8349) Add Windows batch files corresponding to kafka-delete-records.sh and kafka-log-dirs.sh

2019-05-10 Thread Kengo Seki (JIRA)
Kengo Seki created KAFKA-8349:
-

 Summary: Add Windows batch files corresponding to 
kafka-delete-records.sh and kafka-log-dirs.sh
 Key: KAFKA-8349
 URL: https://issues.apache.org/jira/browse/KAFKA-8349
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Kengo Seki
Assignee: Kengo Seki


Some shell scripts don't have corresponding batch files in bin\windows.
For improving Windows platform support, I'd like to add the following batch 
files:

- bin\windows\kafka-delete-records.bat
- bin\windows\kafka-log-dirs.bat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8349) Add Windows batch files corresponding to kafka-delete-records.sh and kafka-log-dirs.sh

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837011#comment-16837011
 ] 

ASF GitHub Bot commented on KAFKA-8349:
---

sekikn commented on pull request #6709: KAFKA-8349. Add Windows batch files 
corresponding to kafka-delete-records.sh and kafka-log-dirs.sh
URL: https://github.com/apache/kafka/pull/6709
 
 
   Some shell scripts don't have corresponding batch files in bin\windows.
   For improving Windows platform support, This PR adds the following batch 
files:
   
   - bin\windows\kafka-delete-records.bat
   - bin\windows\kafka-log-dirs.bat
   
   I confirmed that they worked on Windows 10 Pro as follows:
   
   ```
   PS C:\kafka-2.2.0> .\bin\windows\kafka-log-dirs.bat --bootstrap-server 
localhost:9092 --describe
   
   (snip)
   
   Querying brokers for log directories information
   Received log directory information from brokers 0
   
{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"C:\\tmp\\kafka-logs","error":null,"partitions":[{"partition":"sandbox-0","size":213,"offsetLag":0,"isFuture":false}]}]}]}
   ```
   
   ```
   PS C:\kafka-2.2.0> .\bin\windows\kafka-delete-records.bat --bootstrap-server 
localhost:9092 --offset-json-file offset-json-file.txt
   
   (snip)
   
   Executing records delete operation
   Records delete operation completed:
   partition: sandbox-0low_watermark: 2
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Windows batch files corresponding to kafka-delete-records.sh and 
> kafka-log-dirs.sh
> --
>
> Key: KAFKA-8349
> URL: https://issues.apache.org/jira/browse/KAFKA-8349
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Minor
>
> Some shell scripts don't have corresponding batch files in bin\windows.
> For improving Windows platform support, I'd like to add the following batch 
> files:
> - bin\windows\kafka-delete-records.bat
> - bin\windows\kafka-log-dirs.bat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8350) Splitting batches should consider topic-level message size

2019-05-10 Thread huxihx (JIRA)
huxihx created KAFKA-8350:
-

 Summary: Splitting batches should consider topic-level message size
 Key: KAFKA-8350
 URL: https://issues.apache.org/jira/browse/KAFKA-8350
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 2.3.0
Reporter: huxihx


Currently, producers do the batch splitting based on the batch size. However, 
the split will never succeed when batch size is greatly larger than the 
topic-level max message size.

For instance, if the batch size is set to 8MB but we maintain the default value 
for broker-side `message.max.bytes` (112, about1MB), producer will 
endlessly try to split a large batch but never succeeded, as shown below:
{code:java}
[2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 61 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 62 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 63 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 64 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617){code}
A better solution is to have producer do splitting based on the minimum of 
these two configs. However, it is tricky for the client to get the topic-level 
or broker-level config values. Seems  there could be three ways to do this:
 # When broker throws `RecordTooLargeException`, do not swallow its real 
message since it contains the max message size already. If the message is not 
swallowed, the client easily gets it from the response.
 # Add code to issue  `DescribeConfigsRequest` to retrieve the value.
 # If splitting failed, lower down the batch size gradually until the split is 
successful. For  example, 

{code:java}
// In RecordAccumulator.java
private int steps = 1;
..
public int splitAndReenqueue(ProducerBatch bigBatch) {
..
Deque dq = bigBatch.split(this.batchSize / steps);
if (dq.size() == 1) // split failed
steps++;
..
}{code}
Do all of these make sense?

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8350) Splitting batches should consider topic-level message size

2019-05-10 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx updated KAFKA-8350:
--
Description: 
Currently, producers do the batch splitting based on the batch size. However, 
the split will never succeed when batch size is greatly larger than the 
topic-level max message size.

For instance, if the batch size is set to 8MB but we maintain the default value 
for broker-side `message.max.bytes` (112, about1MB), producer will 
endlessly try to split a large batch but never succeeded, as shown below:
{code:java}
[2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 61 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 62 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 63 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 64 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617){code}
A better solution is to have producer do splitting based on the minimum of 
these two configs. However, it is tricky for the client to get the topic-level 
or broker-level config values. Seems  there could be three ways to do this:
 # When broker throws `RecordTooLargeException`, do not swallow its real 
message since it contains the max message size already. If the message is not 
swallowed, the client easily gets it from the response.
 # Add code to issue  `DescribeConfigsRequest` to retrieve the value.
 # If splitting failed, decreases the batch size gradually until the split is 
successful. For  example, 

{code:java}
// In RecordAccumulator.java
private int steps = 1;
..
public int splitAndReenqueue(ProducerBatch bigBatch) {
..
Deque dq = bigBatch.split(this.batchSize / steps);
if (dq.size() == 1) // split failed
steps++;
..
}{code}
Do all of these make sense?

 

 

 

  was:
Currently, producers do the batch splitting based on the batch size. However, 
the split will never succeed when batch size is greatly larger than the 
topic-level max message size.

For instance, if the batch size is set to 8MB but we maintain the default value 
for broker-side `message.max.bytes` (112, about1MB), producer will 
endlessly try to split a large batch but never succeeded, as shown below:
{code:java}
[2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 61 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 62 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 63 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 64 on topic-partition test-0, splitting and retrying 
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender:617){code}
A better solution is to have producer do splitting based on the minimum of 
these two configs. However, it is tricky for the client to get the topic-level 
or broker-level config values. Seems  there could be three ways to do this:
 # When broker throws `RecordTooLargeException`, do not swallow its real 
message since it contains the max message size already. If the message is not 
swallowed, the client easily gets it from the response.
 # Add code to issue  `DescribeConfigsRequest` to retrieve the value.
 # If splitting failed, lower down the batch size gradually until the split is 
successful. For  example, 

{code:java}
// In RecordAccumulator.java
private int steps = 1;
..
public int splitAndReenqueue(ProducerBatch bigBatch) {
..
Deque dq = bigBatch.split(this.batchSize / steps);
if (dq.size() == 1) // s

[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew commented on KAFKA-8315:
---

[~ableegoldman] Can I sanity check my understanding here. From what I can tell, 
the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the 
{{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most 
recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B wlil be selected and the next record 
will be 2, not 1. Is that right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:26 AM:


[~ableegoldman] Can I sanity check my understanding here. From what I can tell, 
the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the 
{{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most 
recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that 
right?


was (Author: the4thamigo_uk):
[~ableegoldman] Can I sanity check my understanding here. From what I can tell, 
the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the 
{{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most 
recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B wlil be selected and the next record 
will be 2, not 1. Is that right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:27 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances 
by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the 
most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that 
right?


was (Author: the4thamigo_uk):
[~ableegoldman] Can I sanity check my understanding here. From what I can tell, 
the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the 
{{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most 
recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that 
right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:27 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances 
by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the 
most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that 
right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:31 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each {{RecordQueue}}, 
then A would be selected, and record 1 would be read first.


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:32 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each {{RecordQueue}}, 
then A would be selected, and record 1 would be read first.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:40 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}+ and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}*** and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

*** for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}*** and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

*** for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +) and the right stream {{partitionTime 
= 1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}+ and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:43 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 1000}}. So, the left stream would be selected first, until 
all records are consumed in the left stream, then the right stream records 
would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +) and the right stream {{partitionTime 
= 1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:47 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 100}}. So, the left stream would be selected first, until all 
records are consumed in the left stream, then the right stream records would be 
consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 1000}}. So, the left stream would be selected first, until 
all records are consumed in the left stream, then the right stream records 
would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:48 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 100}}. So, the left stream would be selected first, until all 
records are consumed up to time 100 in the left stream, then the right stream 
records would be consumed.

In this case, wouldnt many of the left join windows expire before the first 
record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 100}}. So, the left stream would be selected first, until all 
records are consumed in the left stream, then the right stream records would be 
consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837118#comment-16837118
 ] 

ASF GitHub Bot commented on KAFKA-8348:
---

mjsax commented on pull request #6707: KAFKA-8348 document optimized
URL: https://github.com/apache/kafka/pull/6707
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8348:
---
Affects Version/s: 1.0.0
   1.0.1
   1.0.2
   1.1.0
   1.1.1
   2.0.0
   2.0.1
   2.1.0
   2.2.0
   2.1.1

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread Lifei Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lifei Chen resolved KAFKA-8348.
---
Resolution: Fixed

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8348:
---
Fix Version/s: 2.2.2
   2.1.2
   2.3.0
   2.0.2
   1.1.2
   1.0.3

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2
>
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837131#comment-16837131
 ] 

Matthias J. Sax commented on KAFKA-8348:


[~vahid] I marked this as fixed version 2.2.2 – if we roll a new RC, it should 
be updated to 2.2.1.

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2
>
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8348:
---
Reviewer:   (was: Matthias J. Sax)

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2
>
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown

2019-05-10 Thread Eric (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837209#comment-16837209
 ] 

Eric commented on KAFKA-8313:
-

Thanks [~guozhang] for the follow up.  Is there a place I can grab nightly 
builds of Kafka?

> KafkaStreams state not being updated properly after shutdown
> 
>
> Key: KAFKA-8313
> URL: https://issues.apache.org/jira/browse/KAFKA-8313
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Single broker running on Ubuntu Linux.
>Reporter: Eric
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 2.3.0
>
> Attachments: kafka-8313-src.tgz, log.txt
>
>
> I am running a KafkaStreams inside a DropWizard server and I am trying to 
> detect when my stream shuts down (in case a non-recoverable error occurs).  I 
> was hoping I could use KafkaStreams.setStateListener() to be notified when a 
> state change occurs.  When I query the state, KafkaStreams is stuck in the 
> REBALANCING state even though its threads are all DEAD.
>  
> You can easily reproduce this by doing the following:
>  # Create a topic (I have one with 5 partitions)
>  # Create a simple Kafka stream consuming from that topic
>  # Create a StateListener and register it on that KafkaStreams
>  # Start the Kafka stream
>  # Once everything runs, delete the topic using kafka-topics.sh
> When deleting the topic, you will see the StreamThreads' state transition 
> from RUNNING to PARTITION_REVOKED and you will be notified with the 
> KafkaStreams REBALANCING state.  That's all good and expected.  Then the 
> StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the 
> KafkaStreams state is stuck into the REBALANCING thread.  I was expecting to 
> see a NOT_RUNNING state eventually... am I right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837303#comment-16837303
 ] 

John Roesler commented on KAFKA-8315:
-

Ah, yes. My apologies. I was mistaken about the PartitionGroup logic. I think 
you're right, and actually, [~ableegoldman] has filed 
https://issues.apache.org/jira/browse/KAFKA-8347, so I guess she agrees, too.

I think it's actually a pretty straightforward change, and I actually don't 
think it needs a KIP either. Should we try to get this change in for the 15 May 
feature freeze for 2.3?

Since you've taken the time to get familiar with the code, do you want to send 
a PR, [~the4thamigo_uk]? Offhand, I think we can just redefine 
RecordQueue.timestamp() to be the head timestamp instead of the high water-mark 
time. And, of course write/update the relevant tests.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-10 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-8187:
--

Assignee: Bill Bejeck  (was: Matthias J. Sax)

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Bill Bejeck
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the previous 
> reassignment.
> If this host is not assigned 1_0, then the state directory will get cleaned 
> up by the stateDirCleaner thread 10 to 20 minutes later and the record loss 
> issue will be hidden.
> [0] 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869
> [1] 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#

[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837314#comment-16837314
 ] 

Vahid Hashemian commented on KAFKA-8348:


Sounds good [~mjsax].

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.2
>
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-05-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837348#comment-16837348
 ] 

John Roesler commented on KAFKA-7895:
-

FYI, 2.2.1 RC0 vote is in progress. 
(https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E)

Please feel free to test it out, and reply on the vote thread if you have some 
trouble with it.

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.1.2, 2.2.1
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8254) Suppress incorrectly passes a null topic to the serdes

2019-05-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837350#comment-16837350
 ] 

John Roesler commented on KAFKA-8254:
-

FYI, 2.2.1 RC0 vote is in progress. 
(https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E)

Please feel free to test it out, and reply on the vote thread if you have some 
trouble with it.

> Suppress incorrectly passes a null topic to the serdes
> --
>
> Key: KAFKA-8254
> URL: https://issues.apache.org/jira/browse/KAFKA-8254
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> For example, in KTableSuppressProcessor, we do:
> {noformat}
> final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, 
> key));
> {noformat}
> This violates the contract of Serializer (and Deserializer), and breaks 
> integration with known Serde implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8289) KTable, Long> can't be suppressed

2019-05-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837351#comment-16837351
 ] 

John Roesler commented on KAFKA-8289:
-

FYI, 2.2.1 RC0 vote is in progress. 
(https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E)

Please feel free to test it out, and reply on the vote thread if you have some 
trouble with it.

> KTable, Long>  can't be suppressed
> ---
>
> Key: KAFKA-8289
> URL: https://issues.apache.org/jira/browse/KAFKA-8289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Environment: Broker on a Linux, stream app on my win10 laptop. 
> I add one row log.message.timestamp.type=LogAppendTime to my broker's 
> server.properties. stream app all default config.
>Reporter: Xiaolin Jia
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> I write a simple stream app followed official developer guide [Stream 
> DSL|[https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#window-final-results]].
>  but I got more than one [Window Final 
> Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31]
>  from a session time window.
> time ticker A -> (4,A) / 25s,
> time ticker B -> (4, B) / 25s  all send to the same topic 
> below is my stream app code 
> {code:java}
> kstreams[0]
> .peek((k, v) -> log.info("--> ping, k={},v={}", k, v))
> .groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String()))
> .windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20)))
> .count()
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> .toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), 
> k.key(), v));
> {code}
> {{here is my log print}}
> {noformat}
> 2019-04-24 20:00:26.142  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:00:47.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556106587744, 
> endMs=1556107129191},k=A,v=20
> 2019-04-24 20:00:51.071  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:01:16.065  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:01:41.066  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:06.069  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:56.208  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:03:21.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:03:46.078  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:04.684  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:04:11.069  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:19.371  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107426409},k=B,v=9
> 2019-04-24 20:04:19.372  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107445012, 
> endMs=1556107445012},k=A,v=1
> 2019-04-24 20:04:29.604  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:04:36.067  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:49.715  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107451397},k=B,v=10
> 2019-04-24 20:04:49.716  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107445012, 
> endMs=1556107469935},k=A,v=2
> 2019-04-24 20:04:54.593  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:05:01.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:05:19.599  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:05:20.045  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=155610747639

[jira] [Commented] (KAFKA-8204) Streams may flush state stores in the incorrect order

2019-05-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837349#comment-16837349
 ] 

John Roesler commented on KAFKA-8204:
-

FYI, 2.2.1 RC0 vote is in progress. 
(https://lists.apache.org/thread.html/3486798e63ae666fc336ce9009f07c7fdf66a96badc1fed63bcbd2ed@%3Cdev.kafka.apache.org%3E)

Please feel free to test it out, and reply on the vote thread if you have some 
trouble with it.

> Streams may flush state stores in the incorrect order
> -
>
> Key: KAFKA-8204
> URL: https://issues.apache.org/jira/browse/KAFKA-8204
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0, 2.2.1
>
>
> Cached state stores may forward records during a flush call, so Streams 
> should flush the stores in topological order. Otherwise, Streams may flush a 
> downstream store before an upstream one, resulting in sink results being 
> committed without the corresponding state changelog updates being committed.
> This behavior is partly responsible for the bug reported in KAFKA-7895 .
> The fix is simply to flush the stores in topological order, then when the 
> upstream store forwards records to a downstream stateful processor, the 
> corresponding state changes will be correctly flushed as well.
> An alternative would be to repeatedly call flush on all state stores until 
> they report there is nothing left to flush, but this requires a public API 
> change to enable state stores to report whether they need a flush or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8351) Log cleaner must handle transactions spanning multiple segments

2019-05-10 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-8351:
--

 Summary: Log cleaner must handle transactions spanning multiple 
segments
 Key: KAFKA-8351
 URL: https://issues.apache.org/jira/browse/KAFKA-8351
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Reporter: Jason Gustafson
Assignee: Jason Gustafson


When cleaning transactions, we have to do some bookkeeping to keep track of 
which transactions still have data left around. As long as there is still data, 
we cannot remove the transaction marker. The problem is that we do this 
tracking at the segment level. We do not carry over the ongoing transaction 
state between segments. So if the first entry in a segment is a marker, we 
incorrectly clean it. In the worst case, data from a committed transaction 
could become aborted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7858) Replace JoinGroup request/response with automated protocol

2019-05-10 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7858.

Resolution: Fixed

> Replace JoinGroup request/response with automated protocol
> --
>
> Key: KAFKA-7858
> URL: https://issues.apache.org/jira/browse/KAFKA-7858
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8286) KIP-460 Admin Leader Election RPC

2019-05-10 Thread Jose Armando Garcia Sancio (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-8286:
--
Description: 
Tracking issue for implementing KIP-460. Tasks:

# [Done] Design KIP
# [Done] Review KIP
# [Done] Approve KIP
# [Done] Update RPC to support KIP
# [Done] Update controller to support KIP
# [Done] Create CLI command (kafka-leader-election) that implement KIP
# [Done] Search and replace any usage of “preferred” in the code
# Add test for command
# [Done] Add test for controller functionality
# Revisit all of the documentation - generate and audit the new javadocs
# Deprecated since... needs to be update
# Review PR
# Merge PR

  was:
Tracking issue for implementing KIP-460. Tasks:

# [Done] Design KIP
# [Done] Review KIP
# [Done] Approve KIP
# [Done] Update RPC to support KIP
# [Done] Update controller to support KIP
# [Done] Create CLI command (kafka-leader-election) that implement KIP
# [Done] Search and replace any usage of “preferred” in the code
# Add test for command
# Add test for controller functionality
# Revisit all of the documentation - generate and audit the new javadocs
# Deprecated since... needs to be update
# Review PR
# Merge PR


> KIP-460 Admin Leader Election RPC
> -
>
> Key: KAFKA-8286
> URL: https://issues.apache.org/jira/browse/KAFKA-8286
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin, clients, core
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Tracking issue for implementing KIP-460. Tasks:
> # [Done] Design KIP
> # [Done] Review KIP
> # [Done] Approve KIP
> # [Done] Update RPC to support KIP
> # [Done] Update controller to support KIP
> # [Done] Create CLI command (kafka-leader-election) that implement KIP
> # [Done] Search and replace any usage of “preferred” in the code
> # Add test for command
> # [Done] Add test for controller functionality
> # Revisit all of the documentation - generate and audit the new javadocs
> # Deprecated since... needs to be update
> # Review PR
> # Merge PR



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8247) Duplicate error handling in kafka-server-start.sh and actual Kafka class

2019-05-10 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837510#comment-16837510
 ] 

Sönke Liebau commented on KAFKA-8247:
-

I've experimented a bit with this, I removed all those checks and diff'ed the 
output of all shell scripts when ran without parameters.

See the results below, the number at the very end is the return code, but this 
stays the same in all instances.
 There is one additional script that differs: kafka-consumer-perf-test.sh - but 
this is only because it has a random element in the output that changes every 
run (how useful that is is another debate).

Overall this looks good to me, in some cases the output is even signicifantly 
better, because we are not killing uage reporting from the java class anymore.

The output from the two connect classes might be improved a little I guess..

zookeeper-shell is a bit of a special case, as we change behavior by removing 
this check. Currently it returns with a help message, without that check it 
would default to connecting to localhost (behavior that I actually would 
prefer).
 If anybody has this call in a script somewhere this would hang post-change, as 
it never returns. I'm not sure why someone would have that though, save for 
testing purposes..

So overall I think we'd be good to make the change, unsure about Zookeeper 
shell and the -daemon flag, anybody have an opinion on that?

Ping [~hachikuji] since you noticed this duplicate check as well.

 
{noformat}
==
./connect-distributed.sh.out
==

Old output:
USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties
0

-

New output:
[2019-05-10 09:53:16,310] INFO Usage: ConnectDistributed worker.properties 
(org.apache.kafka.connect.cli.ConnectDistributed:64)
0


==
./connect-standalone.sh.out
==

Old output:
USAGE: ./connect-standalone.sh [-daemon] connect-standalone.properties
0

-

New output:
[2019-05-10 09:53:18,139] INFO Usage: ConnectStandalone worker.properties 
connector1.properties [connector2.properties ...] 
(org.apache.kafka.connect.cli.ConnectStandalone:62)
0


==
./kafka-run-class.sh.out
==

Old output:
USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname 
[opts]
0

-

New output:
Usage: java [-options] class [args...]
   (to execute a class)
   or  java [-options] -jar jarfile [args...]
   (to execute a jar file)
where options include:
-d32  use a 32-bit data model if available
-d64  use a 64-bit data model if available
-server   to select the "server" VM
  The default VM is server,
  because you are running on a server-class machine.


-cp 
-classpath 
  A : separated list of directories, JAR archives,
  and ZIP archives to search for class files.
-D=
  set a system property
-verbose:[class|gc|jni]
  enable verbose output
-version  print product version and exit
-version:
  Warning: this feature is deprecated and will be removed
  in a future release.
  require the specified version to run
-showversion  print product version and continue
-jre-restrict-search | -no-jre-restrict-search
  Warning: this feature is deprecated and will be removed
  in a future release.
  include/exclude user private JREs in the version search
-? -help  print this help message
-Xprint help on non-standard options
-ea[:...|:]
-enableassertions[:...|:]
  enable assertions with specified granularity
-da[:...|:]
-disableassertions[:...|:]
  disable assertions with specified granularity
-esa | -enablesystemassertions
  enable system assertions
-dsa | -disablesystemassertions
  disable system assertions
-agentlib:[=]
  load native agent library , e.g. -agentlib:hprof
  see also, -agentlib:jdwp=help and -agentlib:hprof=help
-agentpath:[=]
  load native agent library by full pathname
-javaagent:[=]
  load Java programming language agent, see java.lang.instrument
-splash:
  show splash screen with specified image
See http://www.oracle.com/technetwork/java/javase/documentation/index.html for 
more details.
0


==
./kafka-server-start.sh.out
==

Old output:
USAGE: ./kafka-server-start.sh [-daemon] server.properties [--override 
property=value]*
0

-

New output:
[2019-05-10 09:53:58,855] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jCont

[jira] [Resolved] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition

2019-05-10 Thread tdp (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tdp resolved KAFKA-8339.

Resolution: Not A Problem

Marking as resolved. This is working as expected from the streams side and the 
bug is in user-specific aggregation logic.

> At-least-once delivery guarantee seemingly not met due to async commit / 
> produce failure race condition
> ---
>
> Key: KAFKA-8339
> URL: https://issues.apache.org/jira/browse/KAFKA-8339
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: tdp
>Priority: Major
>
> We have hit a race condition several times now between the StreamThread 
> committing its offsets for a task before the task has fully processed the 
> record through the topology.
>  
> Consider part of a topology that looks like this:
>  
> TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > 
> KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC 
> T2
>  
> Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these 
> records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records 
> using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not 
> all necessary records from topic T1 have been consumed yet or an object 
> representing an aggregation of records if all necessary records from topic T1 
> have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is 
> null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 
> node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into 
> another object type. KSTREAM-SINK-NODE1 then attempts to produce this other 
> object to topic T2.
>  
> The race condition occurs when the stream thread commits its offsets for 
> topic T1 after it consumes some or all of the necessary records from topic T1 
> for an aggregation but before it gets the failure response back from the 
> async produce kicked off by KSTREAM-SINK-NODE1.
>  
> We are running with a LogAndFailExceptionHandler, so when the stream thread 
> tries to commit the next time it fails and the stream thread shuts itself 
> down. The stream task is then reassigned to another stream thread, which 
> reads the offsets previously committed by the original stream thread. That 
> means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to 
> consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 
> will never end up producing the required records to topic T2. This is why it 
> seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 
> never successfully processed records and the stream application continued on 
> past it.
> Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which 
> increases the likelihood of occurrence of the issue when all retries fail 
> since it widens the window at which the async offset commit can occur before 
> the produce record request is marked as failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-8348:
---
Fix Version/s: (was: 2.2.2)
   2.2.1

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8352) Connect System Tests are failing with 404

2019-05-10 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-8352:
--

 Summary: Connect System Tests are failing with 404
 Key: KAFKA-8352
 URL: https://issues.apache.org/jira/browse/KAFKA-8352
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar
 Fix For: 2.3.0


[https://github.com/apache/kafka/pull/6651] modified the way how connect Rest 
Server was started. As a result, the connect system tests are failing with a 
404 error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8352) Connect System Tests are failing with 404

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837590#comment-16837590
 ] 

ASF GitHub Bot commented on KAFKA-8352:
---

mageshn commented on pull request #6713: KAFKA-8352 : Fix Connect System test 
failure 404 Not Found
URL: https://github.com/apache/kafka/pull/6713
 
 
   Retry when there is a 404 error for connect service. This could happen when 
the connect is first started and the resources are not initialized.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect System Tests are failing with 404
> -
>
> Key: KAFKA-8352
> URL: https://issues.apache.org/jira/browse/KAFKA-8352
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.3.0
>
>
> [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest 
> Server was started. As a result, the connect system tests are failing with a 
> 404 error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8265) Connect Client Config Override policy

2019-05-10 Thread Magesh kumar Nandakumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Magesh kumar Nandakumar updated KAFKA-8265:
---
Affects Version/s: 2.3.0
 Priority: Major  (was: Minor)
Fix Version/s: 2.3.0

> Connect Client Config Override policy
> -
>
> Key: KAFKA-8265
> URL: https://issues.apache.org/jira/browse/KAFKA-8265
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Major
> Fix For: 2.3.0
>
>
> Right now, each source connector and sink connector inherit their client 
> configurations from the worker properties. Within the worker properties, all 
> configurations that have a prefix of "producer." or "consumer." are applied 
> to all source connectors and sink connectors respectively.
> We should allow the  "producer." or "consumer." to be overridden in 
> accordance to an override policy determined by the administrator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-10 Thread Aishwarya Gune (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aishwarya Gune updated KAFKA-8346:
--
Description: The replica fetcher thread terminates in case one of the 
partitions being monitored fails. This leads to under-replicated partitions. 
The thread behavior can be improved by dropping that particular partition and 
continuing with the rest of the partitions.  (was: The replica fetcher thread 
terminates in case one of the partitions being monitors fails. It leads to 
under-replicated partitions. The thread behavior can be improved by dropping 
that particular partition and continue handling rest of the partitions.)

> Improve replica fetcher behavior in handling partition failures
> ---
>
> Key: KAFKA-8346
> URL: https://issues.apache.org/jira/browse/KAFKA-8346
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Aishwarya Gune
>Priority: Major
>
> The replica fetcher thread terminates in case one of the partitions being 
> monitored fails. This leads to under-replicated partitions. The thread 
> behavior can be improved by dropping that particular partition and continuing 
> with the rest of the partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8050) remove KafkaMbean when network close

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8050:
---
Fix Version/s: (was: 2.2.2)

> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8050
> URL: https://issues.apache.org/jira/browse/KAFKA-8050
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8048) remove KafkaMbean when network close

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8048:
---
Fix Version/s: (was: 2.2.2)

> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8048
> URL: https://issues.apache.org/jira/browse/KAFKA-8048
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8240) Source.equals() can fail with NPE

2019-05-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837611#comment-16837611
 ] 

Matthias J. Sax commented on KAFKA-8240:


[~vahid] Are you rolling a new RC? You update the other ticket but not this one?

> Source.equals() can fail with NPE
> -
>
> Key: KAFKA-8240
> URL: https://issues.apache.org/jira/browse/KAFKA-8240
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: beginner, easy-fix, newbie
> Fix For: 2.3.0, 2.1.2, 2.2.2
>
>
> Reported on an PR: 
> [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795]
> InternalTopologyBuilder#Source.equals() might fail with NPE if 
> `topicPattern==null`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8051) remove KafkaMbean when network close

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8051:
---
Fix Version/s: (was: 2.2.2)

> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8051
> URL: https://issues.apache.org/jira/browse/KAFKA-8051
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8049) remove KafkaMbean when network close

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8049:
---
Fix Version/s: (was: 2.2.2)

> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8049
> URL: https://issues.apache.org/jira/browse/KAFKA-8049
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7978) Flaky Test SaslSslAdminClientIntegrationTest#testConsumerGroups

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7978:
---
Fix Version/s: (was: 2.2.2)
   (was: 2.3.0)

> Flaky Test SaslSslAdminClientIntegrationTest#testConsumerGroups
> ---
>
> Key: KAFKA-7978
> URL: https://issues.apache.org/jira/browse/KAFKA-7978
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/25/]
> {quote}java.lang.AssertionError: expected:<2> but was:<0> at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:645) at 
> org.junit.Assert.assertEquals(Assert.java:631) at 
> kafka.api.AdminClientIntegrationTest.testConsumerGroups(AdminClientIntegrationTest.scala:1157)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7946) Flaky Test DeleteConsumerGroupsTest#testDeleteNonEmptyGroup

2019-05-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7946:
---
Fix Version/s: (was: 2.2.2)

> Flaky Test DeleteConsumerGroupsTest#testDeleteNonEmptyGroup
> ---
>
> Key: KAFKA-7946
> URL: https://issues.apache.org/jira/browse/KAFKA-7946
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/17/]
> {quote}java.lang.NullPointerException at 
> kafka.admin.DeleteConsumerGroupsTest.testDeleteNonEmptyGroup(DeleteConsumerGroupsTest.scala:96){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6789) Add retry logic in AdminClient requests

2019-05-10 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837616#comment-16837616
 ] 

Matthias J. Sax commented on KAFKA-6789:


[~vahid] Does this need a fix version update from 2.2.2 to 2.2.1 ?

> Add retry logic in AdminClient requests
> ---
>
> Key: KAFKA-6789
> URL: https://issues.apache.org/jira/browse/KAFKA-6789
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.2
>
>
> In KafkaAdminClient, today we treat all error codes as fatal and set the 
> exception accordingly in the returned futures. But for some error codes they 
> can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We 
> could consider adding the retry logic internally in the admin client so that 
> users would not need to retry themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8352) Connect System Tests are failing with 404

2019-05-10 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-8352:
-
Affects Version/s: 2.2.1
   2.1.2
   2.0.2

> Connect System Tests are failing with 404
> -
>
> Key: KAFKA-8352
> URL: https://issues.apache.org/jira/browse/KAFKA-8352
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.3.0
>
>
> [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest 
> Server was started. As a result, the connect system tests are failing with a 
> 404 error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8352) Connect System Tests are failing with 404

2019-05-10 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-8352:
-
Fix Version/s: 2.2.1
   2.1.2
   2.0.2

> Connect System Tests are failing with 404
> -
>
> Key: KAFKA-8352
> URL: https://issues.apache.org/jira/browse/KAFKA-8352
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest 
> Server was started. As a result, the connect system tests are failing with a 
> 404 error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8240) Source.equals() can fail with NPE

2019-05-10 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837618#comment-16837618
 ] 

Vahid Hashemian commented on KAFKA-8240:


[~mjsax] Sorry I missed it. Updated. Feel free to update any other that should 
also be included in RC1. Thanks.

> Source.equals() can fail with NPE
> -
>
> Key: KAFKA-8240
> URL: https://issues.apache.org/jira/browse/KAFKA-8240
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: beginner, easy-fix, newbie
> Fix For: 2.3.0, 2.1.2, 2.2.2
>
>
> Reported on an PR: 
> [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795]
> InternalTopologyBuilder#Source.equals() might fail with NPE if 
> `topicPattern==null`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8240) Source.equals() can fail with NPE

2019-05-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-8240:
---
Fix Version/s: (was: 2.2.2)
   2.2.1

> Source.equals() can fail with NPE
> -
>
> Key: KAFKA-8240
> URL: https://issues.apache.org/jira/browse/KAFKA-8240
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: beginner, easy-fix, newbie
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Reported on an PR: 
> [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795]
> InternalTopologyBuilder#Source.equals() might fail with NPE if 
> `topicPattern==null`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6789) Add retry logic in AdminClient requests

2019-05-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-6789:
---
Fix Version/s: (was: 2.2.2)
   2.2.1

> Add retry logic in AdminClient requests
> ---
>
> Key: KAFKA-6789
> URL: https://issues.apache.org/jira/browse/KAFKA-6789
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
>
> In KafkaAdminClient, today we treat all error codes as fatal and set the 
> exception accordingly in the returned futures. But for some error codes they 
> can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We 
> could consider adding the retry logic internally in the admin client so that 
> users would not need to retry themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6789) Add retry logic in AdminClient requests

2019-05-10 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837620#comment-16837620
 ] 

Vahid Hashemian commented on KAFKA-6789:


Updated.

> Add retry logic in AdminClient requests
> ---
>
> Key: KAFKA-6789
> URL: https://issues.apache.org/jira/browse/KAFKA-6789
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
>
> In KafkaAdminClient, today we treat all error codes as fatal and set the 
> exception accordingly in the returned futures. But for some error codes they 
> can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We 
> could consider adding the retry logic internally in the admin client so that 
> users would not need to retry themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8305) AdminClient should support creating topics with default partitions and replication factor

2019-05-10 Thread Almog Gavra (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837631#comment-16837631
 ] 

Almog Gavra commented on KAFKA-8305:


Approved 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic]

> AdminClient should support creating topics with default partitions and 
> replication factor
> -
>
> Key: KAFKA-8305
> URL: https://issues.apache.org/jira/browse/KAFKA-8305
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Almog Gavra
>Priority: Major
>
> Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
> must contain either partitions and replicas or an exact broker mapping (which 
> then infers partitions and replicas). Some users, however, could benefit from 
> just using the cluster default for replication factor but may not want to use 
> auto topic creation.
> NOTE: I am planning on working on this, but I do not have permissions to 
> assign this ticket to myself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8305) AdminClient should support creating topics with default partitions and replication factor

2019-05-10 Thread Almog Gavra (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-8305:
---
Summary: AdminClient should support creating topics with default partitions 
and replication factor  (was: AdminClient should support creating topics with 
`default.replication.factor`)

> AdminClient should support creating topics with default partitions and 
> replication factor
> -
>
> Key: KAFKA-8305
> URL: https://issues.apache.org/jira/browse/KAFKA-8305
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Almog Gavra
>Priority: Major
>
> Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
> must contain either partitions and replicas or an exact broker mapping (which 
> then infers partitions and replicas). Some users, however, could benefit from 
> just using the cluster default for replication factor but may not want to use 
> auto topic creation.
> NOTE: I am planning on working on this, but I do not have permissions to 
> assign this ticket to myself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8353) org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms after enabling SASL PLAINTEXT authentication

2019-05-10 Thread goutham (JIRA)
goutham created KAFKA-8353:
--

 Summary: org.apache.kafka.common.errors.TimeoutException: Failed 
to update metadata after 6 ms after enabling SASL PLAINTEXT authentication
 Key: KAFKA-8353
 URL: https://issues.apache.org/jira/browse/KAFKA-8353
 Project: Kafka
  Issue Type: Bug
  Components: documentation, security
Affects Versions: 0.10.2.1
Reporter: goutham


I'm running into time out exception when i try to run producer and consumer 
through java or console.
 
*kafka server.properties*
[advertised.host.name|http://advertised.host.name/]=127.0.0.1
 
listeners=SASL_PLAINTEXT://[127.0.0.1:9090|http://127.0.0.1:9090/]

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=PLAIN

sasl.enabled.mechanisms=PLAIN

advertised.listeners=SASL_PLAINTEXT://[127.0.0.1:9090|http://127.0.0.1:9090/]

 

*kafka server jass conf*

 

KafkaServer {  

org.apache.kafka.common.security.plain.PlainLoginModule required

   username="admin"

   password="admin"

   user_admin="admin"

   user_test="test";

 

};

 

 

*client producer/consumer properties* 

 

 
String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, "test", "test");
brokers.delete(brokers.length() - 1, brokers.length());
properties.put("bootstrap.servers", brokers.toString());
properties.put("[retry.backoff.ms|http://retry.backoff.ms/]";, "1000");
properties.put("[reconnect.backoff.ms|http://reconnect.backoff.ms/]";, "1000");
properties.put("max.request.size", "5242880");
properties.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
// properties.put("[metadata.max.age.ms|http://metadata.max.age.ms/]";, 15000); 
//Refresh topic partition leadership every 15 seconds
properties.put("sasl.jaas.config", jaasCfg);
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("ssl.client.auth", "none"); Also added env variable for 
KAKFA_OPTS with jass config location so console consumer can use that login 
module.i am running single node kafka (0.10.2) with zookeeper (3.4.9). with 
these setting both broker and zookeeper comes up.But clients with valid 
credential not able to write/read from the broker.pretty much used steps in 
documentation from apache Kafka.Pls advice?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8221) Augment LeaveGroupRequest to batch operation

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837646#comment-16837646
 ] 

ASF GitHub Bot commented on KAFKA-8221:
---

abbccdda commented on pull request #6714: KAFKA-8221 & KIP-345 part-3: Add 
batch leave group request
URL: https://github.com/apache/kafka/pull/6714
 
 
   We are aiming to support batch leave group request issued from admin client. 
This diff is the first effort to bump leave group request version.
   
   Note that we relax the state check on broker side for restricting a request 
with both `member.id` and `group.instance.id` set. Although it's not possible 
at the moment, we don't want to restrict future development to potentially 
handle leave group from static member too.

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Augment LeaveGroupRequest to batch operation
> 
>
> Key: KAFKA-8221
> URL: https://issues.apache.org/jira/browse/KAFKA-8221
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Having a batch leave group request is a required protocol change to remove a 
> set of static members all at once.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837653#comment-16837653
 ] 

Guozhang Wang commented on KAFKA-8335:
--

[~boquan] [~weichu] Thanks for reporting this issue. Jason and I have talked 
about this issue and Jason has proposed a solution which we'd try to get in to 
the upcoming 2.3.0 release.

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Assignee: Jason Gustafson
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8354) Replace SyncGroup request/response with automated protocol

2019-05-10 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8354:
--

 Summary: Replace SyncGroup request/response with automated protocol
 Key: KAFKA-8354
 URL: https://issues.apache.org/jira/browse/KAFKA-8354
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8355) Add static membership to Range assignor

2019-05-10 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8355:
--

 Summary: Add static membership to Range assignor
 Key: KAFKA-8355
 URL: https://issues.apache.org/jira/browse/KAFKA-8355
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837664#comment-16837664
 ] 

ASF GitHub Bot commented on KAFKA-8335:
---

hachikuji commented on pull request #6715: KAFKA-8335; Clean empty batches when 
sequence numbers are reused
URL: https://github.com/apache/kafka/pull/6715
 
 
   The log cleaner attempts to preserve the last entry for each producerId in 
order to ensure that sequence/epoch state is not lost. The current validation 
checks only the last sequence number for each producerId in order to decide 
whether a batch should be retained. There are two problems with this:
   
   1. Sequence numbers are not unique alone. It is the tuple of sequence number 
and epoch which is uniquely defined.
   2. The group coordinator always writes batches beginning with sequence 
number 0, which means there could be many batches which have the same sequence 
number.
   
   The complete fix for the second issue is probably to do the proper sequence 
number bookkeeping in the coordinator. For now, I have left the coordinator 
implementation unchanged and changed the cleaner logic to use the last offset 
written by a producer instead of the last sequence number. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Assignee: Jason Gustafson
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8356) Add static membership to Round Robin assignor

2019-05-10 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8356:
--

 Summary: Add static membership to Round Robin assignor
 Key: KAFKA-8356
 URL: https://issues.apache.org/jira/browse/KAFKA-8356
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8224) Add static member id into Subscription Info for better rebalance behavior

2019-05-10 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-8224:
---
Issue Type: Improvement  (was: Sub-task)
Parent: (was: KAFKA-7018)

> Add static member id into Subscription Info for better rebalance behavior
> -
>
> Key: KAFKA-8224
> URL: https://issues.apache.org/jira/browse/KAFKA-8224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Based on discussion in [https://github.com/apache/kafka/pull/6177] and 
> KIP-345, we plan to better utilize static member info to make wise rebalance 
> call, such as assignors like Range or Round Robin could become more sticky to 
> rely on static ids instead of coordinator auto-generated ids.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-10 Thread Aishwarya Gune (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aishwarya Gune updated KAFKA-8346:
--
External issue URL: https://github.com/apache/kafka/pull/6716

> Improve replica fetcher behavior in handling partition failures
> ---
>
> Key: KAFKA-8346
> URL: https://issues.apache.org/jira/browse/KAFKA-8346
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Aishwarya Gune
>Priority: Major
>
> The replica fetcher thread terminates in case one of the partitions being 
> monitored fails. This leads to under-replicated partitions. The thread 
> behavior can be improved by dropping that particular partition and continuing 
> with the rest of the partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-10 Thread Aishwarya Gune (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aishwarya Gune updated KAFKA-8346:
--
Description: 
The replica fetcher thread terminates in case one of the partitions being 
monitored fails. This leads to under-replicated partitions. The thread behavior 
can be improved by dropping that particular partition and continuing with the 
rest of the partitions.

KIP-461: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]

  was:The replica fetcher thread terminates in case one of the partitions being 
monitored fails. This leads to under-replicated partitions. The thread behavior 
can be improved by dropping that particular partition and continuing with the 
rest of the partitions.


> Improve replica fetcher behavior in handling partition failures
> ---
>
> Key: KAFKA-8346
> URL: https://issues.apache.org/jira/browse/KAFKA-8346
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Aishwarya Gune
>Priority: Major
>
> The replica fetcher thread terminates in case one of the partitions being 
> monitored fails. This leads to under-replicated partitions. The thread 
> behavior can be improved by dropping that particular partition and continuing 
> with the rest of the partitions.
> KIP-461: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-10 Thread Aishwarya Gune (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aishwarya Gune updated KAFKA-8346:
--
External issue URL:   (was: https://github.com/apache/kafka/pull/6716)

> Improve replica fetcher behavior in handling partition failures
> ---
>
> Key: KAFKA-8346
> URL: https://issues.apache.org/jira/browse/KAFKA-8346
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Aishwarya Gune
>Priority: Major
>
> The replica fetcher thread terminates in case one of the partitions being 
> monitored fails. This leads to under-replicated partitions. The thread 
> behavior can be improved by dropping that particular partition and continuing 
> with the rest of the partitions.
> KIP-461: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8352) Connect System Tests are failing with 404

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837682#comment-16837682
 ] 

ASF GitHub Bot commented on KAFKA-8352:
---

rhauch commented on pull request #6713: KAFKA-8352 : Fix Connect System test 
failure 404 Not Found
URL: https://github.com/apache/kafka/pull/6713
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect System Tests are failing with 404
> -
>
> Key: KAFKA-8352
> URL: https://issues.apache.org/jira/browse/KAFKA-8352
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest 
> Server was started. As a result, the connect system tests are failing with a 
> 404 error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8352) Connect System Tests are failing with 404

2019-05-10 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-8352:
-
Reviewer: Randall Hauch

> Connect System Tests are failing with 404
> -
>
> Key: KAFKA-8352
> URL: https://issues.apache.org/jira/browse/KAFKA-8352
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest 
> Server was started. As a result, the connect system tests are failing with a 
> 404 error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8352) Connect System Tests are failing with 404

2019-05-10 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8352.
--
Resolution: Fixed

Thanks, [~mageshn]!

> Connect System Tests are failing with 404
> -
>
> Key: KAFKA-8352
> URL: https://issues.apache.org/jira/browse/KAFKA-8352
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.0.2, 2.3.0, 2.1.2, 2.2.1
>
>
> [https://github.com/apache/kafka/pull/6651] modified the way how connect Rest 
> Server was started. As a result, the connect system tests are failing with a 
> 404 error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-10 Thread Aishwarya Gune (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aishwarya Gune reassigned KAFKA-8346:
-

Assignee: Aishwarya Gune

> Improve replica fetcher behavior in handling partition failures
> ---
>
> Key: KAFKA-8346
> URL: https://issues.apache.org/jira/browse/KAFKA-8346
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Aishwarya Gune
>Assignee: Aishwarya Gune
>Priority: Major
>
> The replica fetcher thread terminates in case one of the partitions being 
> monitored fails. This leads to under-replicated partitions. The thread 
> behavior can be improved by dropping that particular partition and continuing 
> with the rest of the partitions.
> KIP-461: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-10 Thread Aishwarya Gune (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aishwarya Gune updated KAFKA-8346:
--
Description: 
The replica fetcher thread terminates in case one of the partitions being 
monitored fails. This leads to under-replicated partitions. The thread behavior 
can be improved by dropping that particular partition and continuing with the 
rest of the partitions.

KIP-461: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]

PR draft: [https://github.com/apache/kafka/pull/6716]

  was:
The replica fetcher thread terminates in case one of the partitions being 
monitored fails. This leads to under-replicated partitions. The thread behavior 
can be improved by dropping that particular partition and continuing with the 
rest of the partitions.

KIP-461: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]


> Improve replica fetcher behavior in handling partition failures
> ---
>
> Key: KAFKA-8346
> URL: https://issues.apache.org/jira/browse/KAFKA-8346
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Aishwarya Gune
>Assignee: Aishwarya Gune
>Priority: Major
>
> The replica fetcher thread terminates in case one of the partitions being 
> monitored fails. This leads to under-replicated partitions. The thread 
> behavior can be improved by dropping that particular partition and continuing 
> with the rest of the partitions.
> KIP-461: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]
> PR draft: [https://github.com/apache/kafka/pull/6716]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-10 Thread Aishwarya Gune (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aishwarya Gune updated KAFKA-8346:
--
Description: 
The replica fetcher thread terminates in case one of the partitions being 
monitored fails. This leads to under-replicated partitions. The thread behavior 
can be improved by dropping that particular partition and continuing with the 
rest of the partitions.

KIP-461: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]

  was:
The replica fetcher thread terminates in case one of the partitions being 
monitored fails. This leads to under-replicated partitions. The thread behavior 
can be improved by dropping that particular partition and continuing with the 
rest of the partitions.

KIP-461: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]

PR draft: [https://github.com/apache/kafka/pull/6716]


> Improve replica fetcher behavior in handling partition failures
> ---
>
> Key: KAFKA-8346
> URL: https://issues.apache.org/jira/browse/KAFKA-8346
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Aishwarya Gune
>Assignee: Aishwarya Gune
>Priority: Major
>
> The replica fetcher thread terminates in case one of the partitions being 
> monitored fails. This leads to under-replicated partitions. The thread 
> behavior can be improved by dropping that particular partition and continuing 
> with the rest of the partitions.
> KIP-461: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7206) Enable batching in FindCoordinator

2019-05-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837729#comment-16837729
 ] 

Guozhang Wang commented on KAFKA-7206:
--

[~sagarrao] I think [~shung] has tried to implemented this idea before but it 
turns out to be quite complex (maybe Yishun can give you some pointers of his 
past contributions). If this is your first work in the codebase that maybe very 
ambitious. Anyways, it is up to you and Yishun.

> Enable batching in FindCoordinator
> --
>
> Key: KAFKA-7206
> URL: https://issues.apache.org/jira/browse/KAFKA-7206
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Yishun Guan
>Assignee: Yishun Guan
>Priority: Critical
>  Labels: needs-discussion, needs-kip, newbie++
>
> To quote [~guozhang] :
> "The proposal is that, we extend FindCoordinatorRequest to have multiple 
> consumer ids: today each FindCoordinatorRequest only contains a single 
> consumer id, so in our scenario we need to send N request for N consumer 
> groups still. If we can request for coordinators in a single request, then 
> the workflow could be simplified to:
>  # send a single FindCoordinatorRequest to a broker asking for coordinators 
> of all consumer groups.
>  1.a) note that the response may still succeed in finding some coordinators 
> while error on others, and we need to handle them on that granularity (see 
> below).
>  # and then for the collected coordinator, group them by coordinator id and 
> send one request per coordinator destination.
> Note that this change would require the version to be bumped up, to 
> FIND_COORDINATOR_REQUEST_V3 for such protocol changes, also the RESPONSE 
> version should be bumped up in order to include multiple coordinators."
> A KIP is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8347) Choose next record to process by timestamp

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837764#comment-16837764
 ] 

ASF GitHub Bot commented on KAFKA-8347:
---

ableegoldman commented on pull request #6719: KAFKA-8347: Choose next record to 
process by timestamp
URL: https://github.com/apache/kafka/pull/6719
 
 
   When choosing the next record to process, we should look at the head 
record's timestamp of each partition and choose the lowest rather than choosing 
the lowest of the partition's streamtime.
   
   This change effectively makes RecordQueue return the timestamp of the head 
record rather than its streamtime. Streamtime is removed (replaced) from 
RecordQueue as it was only being tracked in order to choose the next partition 
to poll from.
   
   Will add some unit tests soon
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Choose next record to process by timestamp
> --
>
> Key: KAFKA-8347
> URL: https://issues.apache.org/jira/browse/KAFKA-8347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Currently PartitionGroup will determine the next record to process by 
> choosing the partition with the lowest stream time. However if a partition 
> contains out of order data its stream time may be significantly larger than 
> the timestamp of the next record. The next record should instead be chosen as 
> the record with the lowest timestamp across all partitions, regardless of 
> which partition it comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837765#comment-16837765
 ] 

Sophie Blee-Goldman commented on KAFKA-8315:


[~the4thamigo_uk] After I directed you to check out RecordQueue I went back to 
look over it and filed the ticket John linked: you're right, it actually was 
going by partition time rather than by the timestamp of the head record. I've 
opened a simple PR with the fix 
[here|[https://github.com/apache/kafka/pull/6719]]

That said, I'm not sure this actually affects your use case. If all the data is 
in order, partition time should be the same as the head record's timestamp, so 
this should only come into play when processing out of order data. In your 
example above, partition A would have streamtime = 1 when first choosing the 
next record, as it will not have seen the record with timestamp 4 yet.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference

2019-05-10 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8332.

Resolution: Fixed
  Assignee: Bob Barrett  (was: Konstantine Karantasis)

> Regression in handling of JoinGroupRequest disallows deterministic protocol 
> selection based on order of preference
> --
>
> Key: KAFKA-8332
> URL: https://issues.apache.org/jira/browse/KAFKA-8332
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Konstantine Karantasis
>Assignee: Bob Barrett
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When a group of Kafka clients includes more than one embedded protocol in its 
> {{JoinGroupRequest}} along with its metadata, the group membership protocol 
> defines that the protocol which is supported by all the members of a group is 
> selected, and if more than one protocols are supported by all the members the 
> protocol is selected based on the order of preference as defined in the 
> {{JoinGroupRequest}}. 
> A recent change from type {{List}} to type {{Set}} for storing the set of 
> supported embedded protocols in the {{JoinGroupRequest}} combined with the 
> old type of handling with implicit types in the scala code, has introduced 
> non-determinism in the selection of the embedded protocol by the 
> {{GroupCoordinator}}, even though the underlying type of the Set in use is a 
> variant of LinkedHashSet (it respects order). 
> The relevant code is: 
> {code:java}
> // KafkaApis.scala
> val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
>   (protocol.name, protocol.metadata)).toList
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837770#comment-16837770
 ] 

ASF GitHub Bot commented on KAFKA-8332:
---

hachikuji commented on pull request #6695: KAFKA-8332: Restore determinism in 
protocol selection during JoinGroupRequest handling
URL: https://github.com/apache/kafka/pull/6695
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in handling of JoinGroupRequest disallows deterministic protocol 
> selection based on order of preference
> --
>
> Key: KAFKA-8332
> URL: https://issues.apache.org/jira/browse/KAFKA-8332
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Konstantine Karantasis
>Assignee: Bob Barrett
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When a group of Kafka clients includes more than one embedded protocol in its 
> {{JoinGroupRequest}} along with its metadata, the group membership protocol 
> defines that the protocol which is supported by all the members of a group is 
> selected, and if more than one protocols are supported by all the members the 
> protocol is selected based on the order of preference as defined in the 
> {{JoinGroupRequest}}. 
> A recent change from type {{List}} to type {{Set}} for storing the set of 
> supported embedded protocols in the {{JoinGroupRequest}} combined with the 
> old type of handling with implicit types in the scala code, has introduced 
> non-determinism in the selection of the embedded protocol by the 
> {{GroupCoordinator}}, even though the underlying type of the Set in use is a 
> variant of LinkedHashSet (it respects order). 
> The relevant code is: 
> {code:java}
> // KafkaApis.scala
> val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
>   (protocol.name, protocol.metadata)).toList
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline

2019-05-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837772#comment-16837772
 ] 

ASF GitHub Bot commented on KAFKA-8171:
---

hachikuji commented on pull request #6515: KAFKA-8171: Set callback to null in 
addStopReplicaRequestForBrokers when replica …
URL: https://github.com/apache/kafka/pull/6515
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> callback needs to be null when addStopReplicaRequestForBrokers when replica 
> state transits to offline
> -
>
> Key: KAFKA-8171
> URL: https://issues.apache.org/jira/browse/KAFKA-8171
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: kehu
>Priority: Major
>
> Problem: 
> In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, 
> it will try to group the requests based on deletePartition flag and callback:
> val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => 
> !r.deletePartition && r.callback == null)
> When both conditions meet, controller is expected to send only one request to 
> destination broker. However, when adding the requests in ReplicaStateMachine, 
> it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is 
> always empty and controller will always first sends an empty request followed 
> by #partitions requests.
>  
> Fix: set the callback to null in addStopReplicaRequestForBrokers when replica 
> state transits to offline. PR has been created: 
> https://github.com/apache/kafka/pull/6515



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline

2019-05-10 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8171.

Resolution: Fixed

This was fixed by KAFKA-8237.

> callback needs to be null when addStopReplicaRequestForBrokers when replica 
> state transits to offline
> -
>
> Key: KAFKA-8171
> URL: https://issues.apache.org/jira/browse/KAFKA-8171
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: kehu
>Priority: Major
>
> Problem: 
> In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, 
> it will try to group the requests based on deletePartition flag and callback:
> val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => 
> !r.deletePartition && r.callback == null)
> When both conditions meet, controller is expected to send only one request to 
> destination broker. However, when adding the requests in ReplicaStateMachine, 
> it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is 
> always empty and controller will always first sends an empty request followed 
> by #partitions requests.
>  
> Fix: set the callback to null in addStopReplicaRequestForBrokers when replica 
> state transits to offline. PR has been created: 
> https://github.com/apache/kafka/pull/6515



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)