[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278 ]
Andrew edited comment on KAFKA-8315 at 5/9/19 10:58 AM: -------------------------------------------------------- [~vvcephei] Thanks again for your help and advice. Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). {Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer<Object,GenericRecord, KeyValue<Object,GenericRecord>>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue<Object, GenericRecord> transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream<Object, GenericRecord> leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream<Object, GenericRecord> rightStream = builder.stream(props.rightTopic().getName()).transform(streamLogger); // setup the join final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D")); final KStream<Object, GenericRecord> joinStream = leftStream.join(rightStream, (l, r) -> { log.info("joining: " + l + ", " + r); return null; } , joinWindow); return builder.build(); } {Code} was (Author: the4thamigo_uk): Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). {Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer<Object,GenericRecord, KeyValue<Object,GenericRecord>>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue<Object, GenericRecord> transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream<Object, GenericRecord> leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream<Object, GenericRecord> rightStream = builder.stream(props.rightTopic().getName()).transform(streamLogger); // setup the join final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D")); final KStream<Object, GenericRecord> joinStream = leftStream.join(rightStream, (l, r) -> { log.info("joining: " + l + ", " + r); return null; } , joinWindow); return builder.build(); } {Code} > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > ----------------------------------------------------------------------------------------------------- > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Andrew > Assignee: John Roesler > Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)