[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838565#comment-16838565
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 3:52 PM:


This is the test enhanced to use timestamp extraction, and it works  ;

[https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]

 

So, it would seem that the issue is how the data is read when the data is 
already fully populated in the source topics. Seems like, as we discussed 
previously, it simply reads all the left records first, then the right records. 
How can we throttle the ingestion of the records to avoid this?


was (Author: the4thamigo_uk):
This is the test enhanced to use timestamp extraction, and it works  ;

[https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]

 

So, it would seem that the issue is how the data is read when the data is 
already fully populated in the source topics.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838565#comment-16838565
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 3:45 PM:


This is the test enhanced to use timestamp extraction, and it works  ;

[https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]

 

So, it would seem that the issue is how the data is read when the data is 
already fully populated in the source topics.


was (Author: the4thamigo_uk):
This is the test enhanced to use timestamp extraction, and it works  ;

[https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]

 

So, it would seem that the issue is how the data is read when the data already 
exists in the topics.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838565#comment-16838565
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 2:17 PM:


This is the test enhanced to use timestamp extraction, and it works  ;

[https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]

 

So, it would seem that the issue is how the data is read when the data already 
exists in the topics.


was (Author: the4thamigo_uk):
This is the test enhanced to use timestamp extraction, and it works  ;

https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838550#comment-16838550
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 1:50 PM:


This test appears to work ok 
[https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.|https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794]

 

Differences with join-example are :
1) It is using TopologyTestDriver, which means data is not pre-populated in 
topics.
2) Im not using timestamp extractors


was (Author: the4thamigo_uk):
This test appears to work ok 
[https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.|https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794]

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838550#comment-16838550
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 1:48 PM:


This test appears to work ok 
[https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.|https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794]


was (Author: the4thamigo_uk):
This test appears to work ok 
[https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.]

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838550#comment-16838550
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 1:48 PM:


This test appears to work ok 
[https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.|https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794]


was (Author: the4thamigo_uk):
This test appears to work ok 
[https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.|https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794]

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838471#comment-16838471
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 12:52 PM:
-

[~vvcephei] [~ableegoldman] I was just looking at the unit tests for 
PartitionGroup and noticed that the comment refers to timestamps, but the 
ConsumerGroup constructor that is used passes the value in the offset parameter 
:

[https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java#L92]

Is this correct?

 

Update: I see now that this is the MockTimestampExtractor that uses the offset 
as the timestamp... 

[https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java]

 


was (Author: the4thamigo_uk):
[~vvcephei] [~ableegoldman] I was just looking at the unit tests for 
PartitionGroup and noticed that the comment refers to timestamps, but the 
ConsumerGroup constructor that is used passes the value in the offset parameter 
:

[https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java#L92]

Is this correct?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838012#comment-16838012
 ] 

Andrew edited comment on KAFKA-8315 at 5/13/19 7:10 AM:


[~ableegoldman] Right, I see what you mean, this loop only goes until the head 
is found 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L158.]
 We do have out of order data in our real data streams, however, it looks like 
you are right that it shouldn't affect my {{join-example}} demo, which 
reproduces the issue with only ordered data. 

Any further ideas on why the {{join-example}} doesnt work? If it is a different 
bug, shall we open a new ticket as this current one is not really relevant 
anymore?


was (Author: the4thamigo_uk):
[~ableegoldman] Right, I see what you mean, this loop only goes until the head 
is found 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L158.]
 We do have out of order data in our real data streams, however, it looks like 
you are right that it shouldn't affect my {{join-example}} demo, which 
reproduces the issue with only ordered data. Any further ideas on why the 
{{join-example}} doesnt work?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:48 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 100}}. So, the left stream would be selected first, until all 
records are consumed up to time 100 in the left stream, then the right stream 
records would be consumed.

In this case, wouldnt many of the left join windows expire before the first 
record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 100}}. So, the left stream would be selected first, until all 
records are consumed in the left stream, then the right stream records would be 
consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:47 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 100}}. So, the left stream would be selected first, until all 
records are consumed in the left stream, then the right stream records would be 
consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 1000}}. So, the left stream would be selected first, until 
all records are consumed in the left stream, then the right stream records 
would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:43 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +++) and the right stream 
{{partitionTime = 1000}}. So, the left stream would be selected first, until 
all records are consumed in the left stream, then the right stream records 
would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+++ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +) and the right stream {{partitionTime 
= 1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}*** and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

*** for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} (see +) and the right stream {{partitionTime 
= 1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}+ and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:42 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}+ and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

+ for convenience I am quoting these times as second offsets from the start 
time 190258000ms


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}}*** and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?

*** for convenience I am quoting these times as second offsets from the start 
time 190258000ms

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:40 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

Taking this thought further, in my test data set, if I assume that the 
{{fifoQueue}} are populated in chunks of 10, then initially the left stream 
would have {{partitionTime = 10}} and the right stream {{partitionTime = 
1000}}. So, the left stream would be selected first, until all records are 
consumed in the left stream, then the right stream records would be consumed.

In this case, wouldnt the vast majority of the left join windows expire before 
the first record is read from the {{RecordQueue}} of the right stream?


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:32 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each 
{{RecordQueue.fifoQueue}}, then A would be selected, and record 1 would be read 
first.


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each {{RecordQueue}}, 
then A would be selected, and record 1 would be read first.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:31 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

If, on the other hand we ordered by the earliest time in each {{RecordQueue}}, 
then A would be selected, and record 1 would be read first.


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:27 AM:


[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{PartitionGroup.nonEmptyQueuesByTime}} orders the 
{{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The 
{{RecordQueue.partitionTime}} is the most recent timestamp that has been read 
into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{PartitionGroup.nonEmptyQueuesByTime.poll()}} and the next record will be 2, 
not 1. Is that right?


was (Author: the4thamigo_uk):
[~ableegoldman] [~vvcephei]Can I sanity check my understanding here. From what 
I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances 
by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the 
most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that 
right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew edited comment on KAFKA-8315 at 5/10/19 9:26 AM:


[~ableegoldman] Can I sanity check my understanding here. From what I can tell, 
the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the 
{{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most 
recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B will be selected by 
{{nonEmptyQueuesByTime.poll()}} and the next record will be 2, not 1. Is that 
right?


was (Author: the4thamigo_uk):
[~ableegoldman] Can I sanity check my understanding here. From what I can tell, 
the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the 
{{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most 
recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B wlil be selected and the next record 
will be 2, not 1. Is that right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836735#comment-16836735
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:04 PM:


Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

Both topics are preloaded into kafka here 
https://github.com/the4thamigo-uk/join-example/blob/master/create_topics.sh#L3

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Also, I notice that all the x records are read first, then all the y records 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452

Am I doing something wrong here?


was (Author: the4thamigo_uk):
Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Also, I notice that all the x records are read first, then all the y records 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836735#comment-16836735
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:03 PM:


Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Also, I notice that all the x records are read first, then all the y records 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452

Am I doing something wrong here?


was (Author: the4thamigo_uk):
Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836735#comment-16836735
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 9:59 PM:
---

Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?


was (Author: the4thamigo_uk):
Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:58 AM:


[~vvcephei] Thanks again for your help and advice. 

Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
     final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
     (l, r) ->

{     log.info("joining: " + l + ", " + r);     
return null;     }

, joinWindow);

    return builder.build();
     }
{Code}
 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:57 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
     final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
     (l, r) ->

{     log.info("joining: " + l + ", " + r);     
return null;     }

, joinWindow);

    return builder.build();
     }
{code:java}
 {code}
 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 \{Code}
    private static Topology joinTestStreamStream(final JoinerProperties props) {

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    }

    @Override
    public KeyValue transform(Object key, 
GenericRecord value) {
    log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));
    return new KeyValue<>(key,value);
    }

    @Override
    public void close() {

    }
    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
    final KStream rightStream = 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:57 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
     final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
     (l, r) ->

{     log.info("joining: " + l + ", " + r);     
return null;     }

, joinWindow);

    return builder.build();
     }
{Code}
 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:56 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 \{Code}
    private static Topology joinTestStreamStream(final JoinerProperties props) {

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    }

    @Override
    public KeyValue transform(Object key, 
GenericRecord value) {
    log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));
    return new KeyValue<>(key,value);
    }

    @Override
    public void close() {

    }
    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
    final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
    final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
    (l, r) -> {
    log.info("joining: " + l + ", " + r);
    return null;
    }, joinWindow);

    return builder.build();
    }
{Code}


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:43 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped, so maybe our working 
hypothesis is incorrect then. As you suggest, I am currently re-running a test 
using the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835832#comment-16835832
 ] 

John Roesler edited comment on KAFKA-8315 at 5/8/19 7:17 PM:
-

Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your 
investigation is progressing.

Last question first: Streams should choose to consume from the left or right 
based on which one has the lower timestamp in the next record, so I would not 
expect one side to "run ahead" of the other. There's one caveat, that when one 
side is being produced more slowly, Streams won't just wait indefinitely for 
the next data, but instead just process the side that does have data. This is 
controled by the "max idle ms" config, but since you're processing 
historically, this shouldn't be your problem. Still might be worth a look.

Maybe for debugging purposes, you can print out the key, value, and timestamp 
for each of the sides as well as in the joiner, so you can identify which side 
is triggering the join, and evaluate whether or not it's correctly time-ordered.

If it is in fact running ahead on one side, despite what it should be doing, 
this would explain why you see better results with a larger grace period. To 
confirm, the grace period should only matter up to the maximum time skew in 
your stream. So, as you said, if you have two producers that each produce a 
full 24 hours of data, sequentially, then you should see stream time advance 
when the first producer writes its data, and then "freeze" while the second 
producer writes its (out-of-order) data. Thus, you'll want to set the grace 
period to keep old windows around for at least 24 hours, since you know you 
have to wait for that second producer's data.

Finally, to answer your earlier questions, yes, each task is handling just one 
partition of both input topics (the same partition on the left and right). 
Stream Time is independently maintained for each task/partition, and it is 
computed simply as the highest timestamp yet observed for that partition. If 
you want to look at it in detail, it's tracked in 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . 
Actually, you can set that class's logger to DEBUG mode and it'll print out 
every time it skips a record that is outside of retention.

Minor point, you should not need to mess with the retention of the changelog 
topic. Streams sets this appropriately to preserve the same data as the store, 
but this is only apparent when restoring the store. The actual results of the 
join are served out of the state store, so only the state store's retention 
matters. This is what you're setting with the grace period.

I hope this helps!
-John


was (Author: vvcephei):
Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your 
investigation is progressing.

Last question first: Sterams should choose to consume from the left or right 
based on which one has the lower timestamp in the next record, so I would not 
expect one side to "run ahead" of the other. There's one caveat, that when one 
side is being produced more slowly, Streams won't just wait indefinitely for 
the next data, but instead just process the side that does have data. This is 
controled by the "max idle ms" config, but since you're processing 
historically, this shouldn't be your problem. Still might be worth a look.

Maybe for debugging purposes, you can print out the key, value, and timestamp 
for each of the sides as well as in the joiner, so you can identify which side 
is triggering the join, and evaluate whether or not it's correctly time-ordered.

If it is in fact running ahead on one side, despite what it should be doing, 
this would explain why you see better results with a larger grace period. To 
confirm, the grace period should only matter up to the maximum time skew in 
your stream. So, as you said, if you have two producers that each produce a 
full 24 hours of data, sequentially, then you should see stream time advance 
when the first producer writes its data, and then "freeze" while the second 
producer writes its (out-of-order) data. Thus, you'll want to set the grace 
period to keep old windows around for at least 24 hours, since you know you 
have to wait for that second producer's data.

Finally, to answer your earlier questions, yes, each task is handling just one 
partition of both input topics (the same partition on the left and right). 
Stream Time is independently maintained for each task/partition, and it is 
computed simply as the highest timestamp yet observed for that partition. If 
you want to look at it in detail, it's tracked in 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . 
Actually, you can set that class's logger to DEBUG mode and it'll print out 
every time it skips a record that is outside of retention.

Minor point, you should not need to 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835784#comment-16835784
 ] 

Andrew edited comment on KAFKA-8315 at 5/8/19 5:54 PM:
---

After a lot of investigation, we think this issue is down to the fact that we 
have a left stream with minute-by-minute data and a right topic with daily data.

It is not clear what logic controls the rate at which records are read from 
left and right streams, but we believe that the right topic is being read at a 
rate such that that it quickly gets too far ahead of the left stream (in terms 
of event-time), as there are far fewer records, and therefore the right stream 
windows are being expired before the left stream data has been read.

[~vvcephei] What controls the rate that records are read from the left and 
right streams? Is there any guarantee that the timestamps for the records in 
the left and right streams are kept more-or-less in line with the records from 
the right stream?

If not, is there any way we can somehow delay the right stream?

 

Thanks for your help above.

 


was (Author: the4thamigo_uk):
After a lot of investigation, we think this issue is down to the fact that we 
have a left stream with minute-by-minute data and a right topic with daily data.

It is not clear what logic controls the rate at which records are read from 
left and right streams, but we believe that the right topic is being read at a 
rate such that that it quickly gets too far ahead of the left stream (in terms 
of event-time) and therefore the right stream windows are being expired before 
the left stream data has been read.

[~vvcephei] What controls the rate that records are read from the left and 
right streams? Is there any guarantee that the timestamps for the records in 
the left and right streams are kept more-or-less in line with the records from 
the right stream?

If not, is there any way we can somehow delay the right stream?

 

Thanks for your help above.

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833088#comment-16833088
 ] 

Andrew edited comment on KAFKA-8315 at 5/4/19 3:53 PM:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
final GenericRecord record = new GenericData.Record(this.schema);
record.put(IS_UPDATED_FIELD, isUpdated);
record.put(LEFT_VALUE_FIELD, leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);
return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{code:java}
@Override public KeyValue transform(Object key, GenericRecord val) {
  final long ts = 
tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val));
  context.forward(key, val, To.all().withTimestamp(ts));
  return null;
} 
{code}
 


was (Author: the4thamigo_uk):
Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
inal GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated);
record.put(LEFT_VALUE_FIELD, leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);
return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{code:java}
@Override public KeyValue transform(Object key, GenericRecord val) {
  final long ts = 
tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val));
  context.forward(key, val, To.all().withTimestamp(ts));
  return null;
} 
{code}
 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833088#comment-16833088
 ] 

Andrew edited comment on KAFKA-8315 at 5/4/19 3:53 PM:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
inal GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated);
record.put(LEFT_VALUE_FIELD, leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);
return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{code:java}
@Override public KeyValue transform(Object key, GenericRecord val) {
  final long ts = 
tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val));
  context.forward(key, val, To.all().withTimestamp(ts));
  return null;
} 
{code}
 


was (Author: the4thamigo_uk):
Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{Code}
 @Override public KeyValue transform(Object key, GenericRecord val) \{ final 
long ts = tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val)); 
context.forward(key, val, To.all().withTimestamp(ts)); return null; } 
{Code}

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833088#comment-16833088
 ] 

Andrew edited comment on KAFKA-8315 at 5/4/19 3:51 PM:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{Code}
 @Override public KeyValue transform(Object key, GenericRecord val) \{ final 
long ts = tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val)); 
context.forward(key, val, To.all().withTimestamp(ts)); return null; } 
{Code}


was (Author: the4thamigo_uk):
Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to a parent GenericRecord e.g.

{Code}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{Code}
 * I have maybe an unusual transform on line 33, which ensures that the joined 
record always has the timestamp of the left record. This is important for the 
aggregation phase.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 6:54 PM:
---

[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?
 


was (Author: the4thamigo_uk):
[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

 
| |
| |

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 6:54 PM:
---

[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

 
| |
| |


was (Author: the4thamigo_uk):
[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

-For reference, I have uploaded the rowkey and timestamps of the kind of 
records we have in our streams : [https://github.com/the4thamigo-uk/join_data
]-[
|https://github.com/the4thamigo-uk/join_data]I think the data I put here is 
incorrect let me fix.[
|https://github.com/the4thamigo-uk/join_data]

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 6:34 PM:
---

[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

-For reference, I have uploaded the rowkey and timestamps of the kind of 
records we have in our streams : [https://github.com/the4thamigo-uk/join_data
]-[
|https://github.com/the4thamigo-uk/join_data]I think the data I put here is 
incorrect let me fix.[
|https://github.com/the4thamigo-uk/join_data]


was (Author: the4thamigo_uk):
[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

For reference, I have uploaded the rowkey and timestamps of the kind of records 
we have in our streams : [https://github.com/the4thamigo-uk/join_data]

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 6:21 PM:
---

[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

For reference, I have uploaded the rowkey and timestamps of the kind of records 
we have in our streams : [https://github.com/the4thamigo-uk/join_data]


was (Author: the4thamigo_uk):
[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

For reference, I have uploaded the rowkey and timestamps of all the records in 
both of our streams : https://github.com/the4thamigo-uk/join_data

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 5:32 PM:
---

[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

For reference, I have uploaded the rowkey and timestamps of all the records in 
both of our streams : https://github.com/the4thamigo-uk/join_data


was (Author: the4thamigo_uk):
[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

For reference, I am attaching the row key and timestamp for all the records in 
our left and right topics.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 3:26 PM:
---

[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest.

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

 

For reference, I am attaching the row key and timestamp for all the records in 
our left and right topics.


was (Author: the4thamigo_uk):
[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest. 

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832543#comment-16832543
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 3:05 PM:
---

[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions of the left and right 
topics). We are running 4 instances of the streams application, and we do not 
explicitly set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams application instances (processes). All instances share exactly the same 
configuration and, in particular, the same application.id. (My assumption is 
that stream time is per-task (i.e. per partition). Is this correct? Is there 
_any_ possibility that the stream time of partition 13 is somehow shared with 
any of the other tasks, such that windows might be closed before the join-able 
data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 
filter to see if I manage to join to earlier records.

P.S. we are using the following versions
    2.2.0
    5.2.1


was (Author: the4thamigo_uk):
[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions of the left and right 
topics). We are running 4 instances of the streams application, and we do not 
explicitly set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams application instances (processes). All instances share exactly the same 
configuration and, in particular, the same application.id. (My assumption is 
that stream time is per-task (i.e. per partition). Is this correct? Is there 
_any_ possibility that the stream time of partition 13 is somehow shared with 
any of the other tasks, such that windows might be closed before the join-able 
data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832543#comment-16832543
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 3:03 PM:
---

[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions of the left and right 
topics). We are running 4 instances of the streams application, and we do not 
explicitly set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams application instances (processes). All instances share exactly the same 
configuration and, in particular, the same application.id. (My assumption is 
that stream time is per-task (i.e. per partition). Is this correct? Is there 
_any_ possibility that the stream time of partition 13 is somehow shared with 
any of the other tasks, such that windows might be closed before the join-able 
data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 
filter to see if I manage to join to earlier records.


was (Author: the4thamigo_uk):
[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions of the left and right 
topics). We are running 4 instances of the streams application, and we do not 
explicitly set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832543#comment-16832543
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 2:38 PM:
---

[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions ofleft and right topics). We 
are running 4 instances of the streams application, and we do not explicitly 
set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 
filter to see if I manage to join to earlier records.


was (Author: the4thamigo_uk):
[~vvcephei] Ive been struggling with this today. 


 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the window size to 
2 days, and increased the grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I get a few joins before the 6 day window for a 
single partition and this partition is the first to complete as it has the 
fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions ofleft and right topics). We 
are running 4 instances of the streams application, and we do not explicitly 
set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832543#comment-16832543
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 2:38 PM:
---

[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions of the left and right 
topics). We are running 4 instances of the streams application, and we do not 
explicitly set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 
filter to see if I manage to join to earlier records.


was (Author: the4thamigo_uk):
[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions ofleft and right topics). We 
are running 4 instances of the streams application, and we do not explicitly 
set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832325#comment-16832325
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 1:03 PM:
---

The fudge factor is the \{windowstore.changelog.additional.retention.ms} which 
is set to 24h by default, so that seems to add up to 5 days (120 hours).

You understand my use case well. The choice of limiting the grace was to reduce 
the performance overhead during the large historical processing.

I definitely have lots of data beyond this that should join and the joins and 
aggregation within the latest 120 hours seem to be correct and complete. This 
seems to be just related to retention. I will experiment with increasing 
\{windowstore.changelog.additional.retention.ms} to see if it brings in more 
data.

One oddity of our left stream is that it contains records in batches from 
different devices. Each batch is about 1000 records and contiguous within the 
stream. Within a batch the records are in increasing timestamp order. 
Subsequent batches from different devices will be within 1-2 days of the 
previous batch (we don't have, say, a batch for 1/1/2019 followed by a batch 
for 4/1/2019 or a batch for 24/12/2019 for example. The right stream is single 
records not batches with similar date ordering (i.e. subsequent records should 
be within 1-2 days of each other.

My understanding is that the windows are closed when streamtime - windowstart > 
windowsize + grace. So as stream time increases as newer batches arrive, 
joins/aggregations should continue, until a batch arrives after windowsize + 
grace. However, we are not seeing this.

 

[Correction]

 

I think I meant : 

streamtime - windowend > grace


was (Author: the4thamigo_uk):
The fudge factor is the \{windowstore.changelog.additional.retention.ms} which 
is set to 24h by default, so that seems to add up to 5 days (120 hours).

You understand my use case well. The choice of limiting the grace was to reduce 
the performance overhead during the large historical processing.

I definitely have lots of data beyond this that should join and the joins and 
aggregation within the latest 120 hours seem to be correct and complete. This 
seems to be just related to retention. I will experiment with increasing 
\{windowstore.changelog.additional.retention.ms} to see if it brings in more 
data. 

One oddity of our left stream is that it contains records in batches from 
different devices. Each batch is about 1000 records and contiguous within the 
stream. Within a batch the records are in increasing timestamp order. 
Subsequent batches from different devices will be within 1-2 days of the 
previous batch (we don't have, say, a batch for 1/1/2019 followed by a batch 
for 4/1/2019 or a batch for 24/12/2019 for example. The right stream is single 
records not batches with similar date ordering (i.e. subsequent records should 
be within 1-2 days of each other.

My understanding is that the windows are closed when streamtime - windowstart > 
windowsize + grace. So as stream time increases as newer batches arrive, 
joins/aggregations should continue, until a batch arrives after windowsize + 
grace. However, we are not seeing this.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)