[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833161#comment-16833161
 ] 

Andrew commented on KAFKA-8315:
-------------------------------

I have run a join by removing the timestamp transform and the aggregation and I 
still get the same behaviour. i.e. the following topology :

{Code}
    private static Topology joinStreamStream(final JoinerProperties props) {

        final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
        final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
        final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
        final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

        // extractors
        final TimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
        final TimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

        final StreamsBuilder builder = new StreamsBuilder();
        final Consumed<Object, GenericRecord> leftConsumed = 
Consumed.with(leftTsExtractor);
        final KStream<Object, GenericRecord> leftStream = 
AvroMinMaxTimestampTransformer.wrap(
                builder.stream(props.leftTopic().getName(), leftConsumed),
                props.minStreamTimestamp(), props.maxStreamTimestamp());

        final Consumed<Object, GenericRecord> rightConsumed = 
Consumed.with(rightTsExtractor);
        final KStream<Object, GenericRecord> rightStream = 
AvroMinMaxTimestampTransformer.wrap(
                builder.stream(props.rightTopic().getName(), rightConsumed),
                props.minStreamTimestamp(), props.maxStreamTimestamp());

        // setup the join
        final ValueJoiner<GenericRecord, GenericRecord, GenericRecord> joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
        final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).after(props.joinWindowAfterSize()).before(props.joinWindowBeforeSize()).grace(props.joinWindowGrace());

        final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
        final KStream<Object, GenericRecord> joinStream = 
join.execute(rightStream, joiner, joinWindow);

        // write the change-log stream to the topic
        joinStream
                .mapValues(joinValueMapper::apply)
                .to(props.joinTopic());

        return builder.build();
    }
{Code}

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to