[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833161#comment-16833161 ]
Andrew commented on KAFKA-8315: ------------------------------- I have run a join by removing the timestamp transform and the aggregation and I still get the same behaviour. i.e. the following topology : {Code} private static Topology joinStreamStream(final JoinerProperties props) { final JoinedRecordFactory joinedRecordFactory = JoinedRecordFactory.create(props.leftTopic().getSchema(), props.rightTopic().getSchema()); final FieldMapper leftFieldMapper = FieldMapper.create(props.leftTopic().getFields()); final FieldMapper rightFieldMapper = FieldMapper.create(props.rightTopic().getFields()); final JoinValueMapper joinValueMapper = JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, props.joinSchema()); // extractors final TimestampExtractor leftTsExtractor = AvroTimestampExtractor.create(props.leftTopic().getTimestampField()); final TimestampExtractor rightTsExtractor = AvroTimestampExtractor.create(props.rightTopic().getTimestampField()); final StreamsBuilder builder = new StreamsBuilder(); final Consumed<Object, GenericRecord> leftConsumed = Consumed.with(leftTsExtractor); final KStream<Object, GenericRecord> leftStream = AvroMinMaxTimestampTransformer.wrap( builder.stream(props.leftTopic().getName(), leftConsumed), props.minStreamTimestamp(), props.maxStreamTimestamp()); final Consumed<Object, GenericRecord> rightConsumed = Consumed.with(rightTsExtractor); final KStream<Object, GenericRecord> rightStream = AvroMinMaxTimestampTransformer.wrap( builder.stream(props.rightTopic().getName(), rightConsumed), props.minStreamTimestamp(), props.maxStreamTimestamp()); // setup the join final ValueJoiner<GenericRecord, GenericRecord, GenericRecord> joiner = AvroFieldsValueJoiner.create(joinedRecordFactory); final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).after(props.joinWindowAfterSize()).before(props.joinWindowBeforeSize()).grace(props.joinWindowGrace()); final KStreamKStreamJoinFunction join = props.joinType() == JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin; final KStream<Object, GenericRecord> joinStream = join.execute(rightStream, joiner, joinWindow); // write the change-log stream to the topic joinStream .mapValues(joinValueMapper::apply) .to(props.joinTopic()); return builder.build(); } {Code} > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > ----------------------------------------------------------------------------------------------------- > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Reporter: Andrew > Assignee: John Roesler > Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)