[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838878#comment-16838878
 ] 

John Roesler commented on KAFKA-8315:
-

[~the4thamigo_uk], The python tests are something different.

I was just talking about the Java classes that are called like 
"WhateverWhateverIntegrationTest". You should be able to run those right from 
the IDE. If you want to run all the streams integration tests, it's `./gradlew 
clean :streams:test`. It will take 7 minutes-ish.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838783#comment-16838783
 ] 

Andrew commented on KAFKA-8315:
---

Thanks [~vvcephei], I am running the unit tests in the kafka project. I confess 
I havent worked out the integration tests yet, but I saw the python scripts for 
this. I agree the ordering from the RecordQueue/PartitionGroup looks sound, and 
that it is something weird with how data is pushed into the RecordQueues from 
the source topics. [~ableegoldman] any clues on this would be greatly 
appreciated.

 

Thanks again for the responses. Hopefully, we will get to the bottom of this.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838720#comment-16838720
 ] 

John Roesler commented on KAFKA-8315:
-

Hi [~the4thamigo_uk],

Unfortunately, the TopologyTestDriver is going to be insufficient for 
exercising the behavior you want, since it processes events synchronously as 
soon as you call `pipeInput`, but the problem you're having appears to be with 
the logic that chooses records polled from Kafka (which only KafkaStreams does).

I'd suggest, as the fastest way to try and nail this down, actually to pull the 
Kafka project down (since we have set up integration tests that actually do use 
the brokers and run a "real" KafkaStreams) and modify one of the join 
integration tests to reproduce your use case.

This still sounds like a bug to me, even though it might not be the one that 
[~ableegoldman] reported.

Regarding the ticket, it'd be better not to split the history of this 
investigation, so I recommend just editing the title and description of the 
ticket, instead of making a new ticket.

Thanks,
-John

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838565#comment-16838565
 ] 

Andrew commented on KAFKA-8315:
---

This is the test enhanced to use timestamp extraction, and it works  ;

https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838550#comment-16838550
 ] 

Andrew commented on KAFKA-8315:
---

This test appears to work ok 
[https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.]

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-13 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838471#comment-16838471
 ] 

Andrew commented on KAFKA-8315:
---

[~vvcephei] [~ableegoldman] I was just looking at the unit tests for 
PartitionGroup and noticed that the comment refers to timestamps, but the 
ConsumerGroup constructor that is used passes the value in the offset parameter 
:

[https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java#L92]

Is this correct?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-12 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838012#comment-16838012
 ] 

Andrew commented on KAFKA-8315:
---

[~ableegoldman] Right, I see what you mean, this loop only goes until the head 
is found 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L158.]
 We do have out of order data in our real data streams, however, it looks like 
you are right that it shouldn't affect my {{join-example}} demo, which 
reproduces the issue with only ordered data. Any further ideas on why the 
{{join-example}} doesnt work?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837765#comment-16837765
 ] 

Sophie Blee-Goldman commented on KAFKA-8315:


[~the4thamigo_uk] After I directed you to check out RecordQueue I went back to 
look over it and filed the ticket John linked: you're right, it actually was 
going by partition time rather than by the timestamp of the head record. I've 
opened a simple PR with the fix 
[here|[https://github.com/apache/kafka/pull/6719]]

That said, I'm not sure this actually affects your use case. If all the data is 
in order, partition time should be the same as the head record's timestamp, so 
this should only come into play when processing out of order data. In your 
example above, partition A would have streamtime = 1 when first choosing the 
next record, as it will not have seen the record with timestamp 4 yet.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837303#comment-16837303
 ] 

John Roesler commented on KAFKA-8315:
-

Ah, yes. My apologies. I was mistaken about the PartitionGroup logic. I think 
you're right, and actually, [~ableegoldman] has filed 
https://issues.apache.org/jira/browse/KAFKA-8347, so I guess she agrees, too.

I think it's actually a pretty straightforward change, and I actually don't 
think it needs a KIP either. Should we try to get this change in for the 15 May 
feature freeze for 2.3?

Since you've taken the time to get familiar with the code, do you want to send 
a PR, [~the4thamigo_uk]? Offhand, I think we can just redefine 
RecordQueue.timestamp() to be the head timestamp instead of the high water-mark 
time. And, of course write/update the relevant tests.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-10 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086
 ] 

Andrew commented on KAFKA-8315:
---

[~ableegoldman] Can I sanity check my understanding here. From what I can tell, 
the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the 
{{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most 
recent timestamp that has been read into the {{RecordQueue.fifoQueue}}.

If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and 
another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will 
have {{RecordQueue.partitionTime = 4}} and B will have 
{{RecordQueue.partitionTime = 3}}. So B wlil be selected and the next record 
will be 2, not 1. Is that right?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836735#comment-16836735
 ] 

Andrew commented on KAFKA-8315:
---

Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836606#comment-16836606
 ] 

Sophie Blee-Goldman commented on KAFKA-8315:


[~the4thamigo_uk] The code you're looking for describing the logic of choosing 
the next record to process is in 
org.apache.kafka.streams.processor.internals.PartitionGroup, which contains a 
priority queue "nonEmptyQueuesByTime" that serves up the partition with the 
lowest timestamp when polled (unless max.idle.ms has passed as checked in 
StreamTask#isProcessable)

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836278#comment-16836278
 ] 

Andrew commented on KAFKA-8315:
---

Ok, so it should work as I had previously hoped, so maybe our working 
hypothesis is incorrect then. As you suggest, I am currently re-running a test 
using the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835832#comment-16835832
 ] 

John Roesler commented on KAFKA-8315:
-

Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your 
investigation is progressing.

Last question first: Sterams should choose to consume from the left or right 
based on which one has the lower timestamp in the next record, so I would not 
expect one side to "run ahead" of the other. There's one caveat, that when one 
side is being produced more slowly, Streams won't just wait indefinitely for 
the next data, but instead just process the side that does have data. This is 
controled by the "max idle ms" config, but since you're processing 
historically, this shouldn't be your problem. Still might be worth a look.

Maybe for debugging purposes, you can print out the key, value, and timestamp 
for each of the sides as well as in the joiner, so you can identify which side 
is triggering the join, and evaluate whether or not it's correctly time-ordered.

If it is in fact running ahead on one side, despite what it should be doing, 
this would explain why you see better results with a larger grace period. To 
confirm, the grace period should only matter up to the maximum time skew in 
your stream. So, as you said, if you have two producers that each produce a 
full 24 hours of data, sequentially, then you should see stream time advance 
when the first producer writes its data, and then "freeze" while the second 
producer writes its (out-of-order) data. Thus, you'll want to set the grace 
period to keep old windows around for at least 24 hours, since you know you 
have to wait for that second producer's data.

Finally, to answer your earlier questions, yes, each task is handling just one 
partition of both input topics (the same partition on the left and right). 
Stream Time is independently maintained for each task/partition, and it is 
computed simply as the highest timestamp yet observed for that partition. If 
you want to look at it in detail, it's tracked in 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . 
Actually, you can set that class's logger to DEBUG mode and it'll print out 
every time it skips a record that is outside of retention.

Minor point, you should not need to mess with the retention of the changelog 
topic. Streams sets this appropriately to preserve the same data as the store, 
but this is only apparent when restoring the store. The actual results of the 
join are served out of the state store, so only the state store's retention 
matters. This is what you're setting with the grace period.

I hope this helps!
-John

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835784#comment-16835784
 ] 

Andrew commented on KAFKA-8315:
---

After a lot of investigation, we think this issue is down to the fact that we 
have a left stream with minute-by-minute data and a right topic with daily data.

It is not clear what logic controls the rate at which records are read from 
left and right streams, but we believe that the right topic is being read at a 
rate such that that it quickly gets too far ahead of the left stream (in terms 
of event-time) and therefore the right stream windows are being expired before 
the left stream data has been read.

[~vvcephei] What controls the rate that records are read from the left and 
right streams? Is there any guarantee that the timestamps for the records in 
the left and right streams are kept more-or-less in line with the records from 
the right stream?

If not, is there any way we can somehow delay the right stream?

 

Thanks for your help above.

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833161#comment-16833161
 ] 

Andrew commented on KAFKA-8315:
---

I have run a join by removing the timestamp transform and the aggregation and I 
still get the same behaviour. i.e. the following topology :

{Code}
    private static Topology joinStreamStream(final JoinerProperties props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final TimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final TimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.leftTopic().getName(), leftConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    final Consumed rightConsumed = 
Consumed.with(rightTsExtractor);
    final KStream rightStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.rightTopic().getName(), rightConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    // setup the join
    final ValueJoiner joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
    final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).after(props.joinWindowAfterSize()).before(props.joinWindowBeforeSize()).grace(props.joinWindowGrace());

    final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
    final KStream joinStream = 
join.execute(rightStream, joiner, joinWindow);

    // write the change-log stream to the topic
    joinStream
    .mapValues(joinValueMapper::apply)
    .to(props.joinTopic());

    return builder.build();
    }
{Code}

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833088#comment-16833088
 ] 

Andrew commented on KAFKA-8315:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to a parent GenericRecord e.g.

{Code}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{Code}
 * I have maybe an unusual transform on line 33, which ensures that the joined 
record always has the timestamp of the left record. This is important for the 
aggregation phase.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577
 ] 

Andrew commented on KAFKA-8315:
---

[~vvcephei] Update I ran using grace = 20D and I see records from around 23D 
prior to the latest data in the topics. So it seems that we definitely can join 
the records, but that the grace period is the critical factor in doing this for 
a historical ingest. 

So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit 
strange, and maybe sounds a bit inefficient?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832543#comment-16832543
 ] 

Andrew commented on KAFKA-8315:
---

[~vvcephei] Ive been struggling with this today. 


 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the window size to 
2 days, and increased the grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I get a few joins before the 6 day window for a 
single partition and this partition is the first to complete as it has the 
fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions ofleft and right topics). We 
are running 4 instances of the streams application, and we do not explicitly 
set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 
filter to see if I manage to join to earlier records.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-03 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832325#comment-16832325
 ] 

Andrew commented on KAFKA-8315:
---

The fudge factor is the \{windowstore.changelog.additional.retention.ms} which 
is set to 24h by default, so that seems to add up to 5 days (120 hours).

You understand my use case well. The choice of limiting the grace was to reduce 
the performance overhead during the large historical processing.

I definitely have lots of data beyond this that should join and the joins and 
aggregation within the latest 120 hours seem to be correct and complete. This 
seems to be just related to retention. I will experiment with increasing 
\{windowstore.changelog.additional.retention.ms} to see if it brings in more 
data. 

One oddity of our left stream is that it contains records in batches from 
different devices. Each batch is about 1000 records and contiguous within the 
stream. Within a batch the records are in increasing timestamp order. 
Subsequent batches from different devices will be within 1-2 days of the 
previous batch (we don't have, say, a batch for 1/1/2019 followed by a batch 
for 4/1/2019 or a batch for 24/12/2019 for example. The right stream is single 
records not batches with similar date ordering (i.e. subsequent records should 
be within 1-2 days of each other.

My understanding is that the windows are closed when streamtime - windowstart > 
windowsize + grace. So as stream time increases as newer batches arrive, 
joins/aggregations should continue, until a batch arrives after windowsize + 
grace. However, we are not seeing this.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-02 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831927#comment-16831927
 ] 

John Roesler commented on KAFKA-8315:
-

Interesting. Thanks for the context. For some reason, the slack link doesn't 
take me to the thread.

Your understanding is almost spot-on.

The grace period defines how long after the window ends it will accept 
late-arriving records and update the results. The retention time defines how 
long we keep the state of the window in storage. Clearly, the retention time 
must be at least big enough to support the updates that may happen during the 
grace period, but it could be much larger, to support Interactive Queries even 
after the window is closed to updates.

Join windows are a little bit different, because they are not queriable. 
Therefore, there is no reason to have any retention beyond the grace period. 
This is also why there's no `Materialized` parameter. The state for the join is 
purely bookeeping, not a "materialized view" in the data processing sense. 
Since you mention the apparent similarity with grouped aggregations, the fact 
that JoinWindows shares a class hierarchy with Windows, and the fact that it 
uses a "normal" WindowStore internally, was mostly for implementation 
convenience. It's actually a little semantically abusive if you really get into 
it, and I've heard a few times that people would like to break joins out and 
clean the whole situation up.

Things get really complicated when we need to compute defaults, though. The 
concepts of "grace" and "retention" used to be coupled into just "retention" 
(aka "until" aka "maintainMs"), and the default was set to 24h. So, if we pick 
any default grace period shorter than 24h, then some apps may start to drop 
late data that didn't before. Also, the "retention" configuration in Windows is 
only deprecated, not removed, so someone may set a retention time on the 
deprecated methods, but not a grace period, and we also need to do the "right 
thing" in that case. This is just in the way of justifying why the code is so 
complicated. Hopefully, we can drop the deprecated methods soonish and clean 
the whole thing up.

So, back to your actual behavior, you *should* see that the window stores for 
join windows use `Duration.ofMillis(windows.size() + windows.gracePeriodMs())`, 
as you pointed out above. The deprecated `until` should be ignored. It's 
possible the topic retention doesn't get updated when you change your configs, 
which would be a bug.

One thing I didn't understand is the arithmetic from your conversation. I'll 
take a shot and maybe you can set me straight...
You want to join 2 years of historical data.
For each join candidate, you only want to look back 2 days, so you set the join 
window to size=2 days.
You want to emit updated join results in the case of time-disordered records, 
but not indefinitely. Specifically, you only want to emit updated results up to 
2 days after the fact, so you set the grace period to 2 days as well.
With these configurations, you should see the retention time on the topic set 
to at least 4 days. 120 hours is 5 days, so this seems about right to me (there 
might be some fudge factor, I'm not sure).

I guess the big problem is that your data fails to join. I'd start with 
identifying some pair of records that you think *should* join, and then 
identify why they don't (could be a join window too small, or it could be the 
grace period too small).

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)