[jira] [Commented] (KAFKA-8315) Historical join issues

2021-07-08 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377546#comment-17377546
 ] 

John Roesler commented on KAFKA-8315:
-

Hello again, all, this issue should be resolved in 3.0, via KAFKA-10091.

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2020-12-04 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244330#comment-17244330
 ] 

John Roesler commented on KAFKA-8315:
-

Hello, all, I have just proposed KIP-695, which I think would resolve this 
issue. Please let me know what you think in the mailing list discussion thread!

[https://cwiki.apache.org/confluence/x/JSXZCQ]

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845088#comment-16845088
 ] 

ASF GitHub Bot commented on KAFKA-8315:
---

guozhangwang commented on pull request #6664: KAFKA-8315: fix the JoinWindows 
retention deprecation doc
URL: https://github.com/apache/kafka/pull/6664
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-17 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842568#comment-16842568
 ] 

Matthias J. Sax commented on KAFKA-8315:


Agreed. As mentioned above: 
https://issues.apache.org/jira/browse/KAFKA-8315?focusedCommentId=16841378&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16841378

It's a known issue tracked as KAFKA-7458

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-17 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842566#comment-16842566
 ] 

John Roesler commented on KAFKA-8315:
-

The existing behavior of the consumer is effectively to round-robin the inputs. 
If you're subscribed to A,B, and C, and you request 20 records, and it gets 15 
records from A and 5 from B, then the next time around, it should give you 
maybe 10 more from B and then 10 from C.

I actually think the problem might just be on startup (but would need to 
verify), since we have no visibility into which partitions have been polled at 
all. After startup, the Consumer behavior in addition to the existing pause 
logic should take care of preferring to poll partitions that are empty. If a 
partition is actually empty (we are caught up), then this is what the 
max.idle.time is for. But this ticket seems different, since we never even 
tried to poll all the inputs before starting work on just one side of the 
join Or maybe I'm not thinking about it clearly.

The point is, as a functional requirement, it seems like historical joins 
should function properly even with a zero idle time.

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841710#comment-16841710
 ] 

Matthias J. Sax commented on KAFKA-8315:


Don't think that should be required? The way Kafka Streams' record queue work, 
should actually work around this behavior. If a partition is paused, `poll()` 
won't return any data for this partition, even if the consumer has buffered 
data.

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841640#comment-16841640
 ] 

Andrew commented on KAFKA-8315:
---

Also, I had a thought whether you could modify 
[https://github.com/the4thamigo-uk/kafka/blob/debugging/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L485.]
 so that at least some records are fetched for a topic if there is a completed 
fetch for it. I think it drains the first one then the next, but you could 
imagine round robin, or timestamp based extraction from more than one completed 
fetch, in order to balance the delivery across streams?

All, just vague hand-wavey thoughts, I appreciate that there is much more to it 
than I comprehend... :oD

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841635#comment-16841635
 ] 

Matthias J. Sax commented on KAFKA-8315:


{quote}as the limit is on the fetch, and the queue cant put back-pressure on 
the fetch.
{quote}
Queues have a configurable size and partitions are pause if a queue fills up. 
Hence, setting `max.task.idle` large enough should be sufficient imho.
{quote}I also wondered whether you could allow the left(right) stream to 
process (in the absence of right(left records), provided the left(right) 
streamTime stays less than the right(left) streamTime -right(left) grace - 
windowSize. i.e. not to go beyond the start of the first currently active 
window on the right(left) stream.
{quote}
Not easily possible. At runtime we don't know anything about the semantics of 
the program. We don't know that a join is executed and also retention time is 
not a runtime concept.

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841611#comment-16841611
 ] 

Andrew commented on KAFKA-8315:
---

Well, Im not sure that just fixing the start is sufficient. I suspect that if 
you run over a long enough period, that through sheer bad luck you will 
encounter an issue where a queue is drained, and yet also fails to fetch any 
records on the next fetch cycle, in this case you will end up with the same 
situation I think i.e. an empty queue that will still be processed because of 
the zero max idle time. This can happen on either side of the join, so if one 
side travels too far ahead of the other you would lose the join windows.

As I was rummaging through the code, I also wondered whether a closer 
relationship with the consumer would make things a bit easier. Then you can 
control better how many records you grab to push into the queues. For example, 
I'm not sure, but I kinda have a suspicion that the queues can grow quite 
large, as the limit is on the fetch, and the queue cant put back-pressure on 
the fetch. But, if you had control over the consumer, and you wanted to limit 
the size of the queue, then you would know if one side is full then you don't 
need to not fetch more records on that side etc. You would also potentially be 
able to use streamTime to make such decisions I suppose.

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841571#comment-16841571
 ] 

John Roesler commented on KAFKA-8315:
-

Oh, man. I'm sorry that that I didn't convey the significance of that 
configuration enough. I just assumed you tried it. I feel bad for all the time 
you spent.

It's called "max idle time" and defaults to 0 because Streams actually has to 
resist processing data that it already has (aka, it has to idle) in order to 
wait for extra data. Streams would never idle before we added the config, and 
idling could have a severe impact on throughput for high-volume applications, 
so we basically can't default greater than 0.

Still, it seems like in a replay case like yours, it should at least wait until 
it polls all inputs at least once before starting, so I agree there's room for 
improvement here. I'm wondering if we should just take more control over the 
consumer and explicitly poll each topic, instead of just assigning them all and 
letting the consumer/broker decide which ones to give data back from first.

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841449#comment-16841449
 ] 

Andrew commented on KAFKA-8315:
---

[~mjsax] [~vvcephei] [~ableegoldman] Thanks for your help on this, I think we 
are good now... Just doing a final test here, but looks promising

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841378#comment-16841378
 ] 

Matthias J. Sax commented on KAFKA-8315:


1) I think, 6719 should make it into 2.3 release.

2) It's actually a known issue: https://issues.apache.org/jira/browse/KAFKA-7458

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841273#comment-16841273
 ] 

Andrew commented on KAFKA-8315:
---

Ha!, you are absolutely right. Maybe I didn't understand the significance of 
this, or I think probably what happened was I misinterpreted it i..e I thought 
I needed to set it to zero, not non-zero. Either way we had a lot of other 
challenges last week which side-tracked me a lot from this investigation. 
Anyway, the deep-dive into kafka internals was well and truly worth it.

Two remaining points :

1) will [https://github.com/apache/kafka/pull/6719] make it into the next 
release do you think, as we have a lot of out-of-order data as well in our 
production feeds?
2) It doesn't seem obvious, that in order to do a historical join that the 
developer has to set a value called 'max task idle milliseconds'. If data is in 
the topic, why should I have to have an 'idle time'? Is there anything that can 
be done to make this more intuitive?

 

 

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841219#comment-16841219
 ] 

Matthias J. Sax commented on KAFKA-8315:


John mentioned the max task idle config earlier in this discussion: 
https://issues.apache.org/jira/browse/KAFKA-8315?focusedCommentId=16835832&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16835832
 
{quote}Perhaps this value should default to non-zero to enable historical joins 
by default?
{quote}
 
 It's not easy to change the default, because of backward compatibility.

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-16 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841213#comment-16841213
 ] 

Andrew commented on KAFKA-8315:
---

[~vvcephei] [~ableegoldman] Using the integration test I think I now understand 
what is going on. 

 

The key bit of code is here : 
[https://github.com/the4thamigo-uk/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L332]

 

What appears to be a happening is this :

 

1) Since the topics are already full of data, the left topic has sufficient 
data (1000 records) in order to trigger leaving this loop 
[https://github.com/the4thamigo-uk/kafka/blob/debugging/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L485.]
 So, no right records are fetched.

2) All the fetched left stream records are added to PartitionGroup, and 
PartitionGroup.allBuffered = false, since the right stream RecordQueue is still 
empty

3) The code drops into here 
[https://github.com/the4thamigo-uk/kafka/blob/debugging/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L330,]
 since maxTaskIdleMs == 0 !
4) The 1000 left stream records are processed (thereby almost immediately 
expiring their join windows !
5) The right stream records are fetched and processed, but there are no left 
stream join windows to join with until the latest records in the left stream 
for which the windows have not expired.

 

And the workaround/fix, is a change of configuration setting : 
[https://github.com/the4thamigo-uk/kafka/commit/189aa764aef06643a8a3c30b2aee3c4a29b82ae6]

Perhaps this value should default to non-zero to enable historical joins by 
default?

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-15 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840541#comment-16840541
 ] 

Andrew commented on KAFKA-8315:
---

[~vvcephei] [~ableegoldman] I have constructed an integration test that I think 
reproduces the issue. Altering the grace period to a large value (e.g. 1 hour) 
makes the test pass :

https://github.com/the4thamigo-uk/kafka/blob/debugging/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java#L100

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-14 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839711#comment-16839711
 ] 

Andrew commented on KAFKA-8315:
---

I constructed a super-ugly workaround for my \{{join-example}} demo app. It 
adds a transformer onto each of the left and right streams, and they refer to 
each other's streamTime to decide whether to forward the messages. Not the 
'right' solution, but I might be able to fix this up to serve as a workaround. 
Need to ensure that the right transformers pair up depending on their assigned 
partition, and maybe use a state store. Its a hack but it might just work for 
now.

 

https://github.com/the4thamigo-uk/join-example/pull/1/files

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Historical join issues

2019-05-14 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839703#comment-16839703
 ] 

Andrew commented on KAFKA-8315:
---

I constructed a super-ugly workaround for my \{{join-example}} demo app. It 
adds a transformer onto each of the left and right streams, and they refer to 
each other's streamTime to decide whether to forward the messages. Not the 
'right' solution, but I might be able to fix this up to serve as a workaround. 
Need to ensure that the right transformers pair up depending on their assigned 
partition, and maybe use a state store. Its a hack but it might just work for 
now.

 

https://github.com/the4thamigo-uk/join-example/pull/1/files

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)