[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833060#comment-16833060
 ] 

Andrew commented on KAFKA-8317:
-------------------------------

[~vvcephei] Here is the code snippet

 

```

    private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

        final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
        final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
        final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
        final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

        // extractors
        final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
        final AvroTimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

        final StreamsBuilder builder = new StreamsBuilder();
        final Consumed<Object, GenericRecord> leftConsumed = 
Consumed.with(leftTsExtractor);
        final KStream<Object, GenericRecord> leftStream = 
AvroMinMaxTimestampTransformer.wrap(
                builder.stream(props.leftTopic().getName(), leftConsumed),
                props.minStreamTimestamp(), props.maxStreamTimestamp());

        final Consumed<Object, GenericRecord> rightConsumed = 
Consumed.with(rightTsExtractor);
        final KStream<Object, GenericRecord> rightStream = 
AvroMinMaxTimestampTransformer.wrap(
                builder.stream(props.rightTopic().getName(), rightConsumed),
                props.minStreamTimestamp(), props.maxStreamTimestamp());

        // setup the join
        final ValueJoiner<GenericRecord, GenericRecord, GenericRecord> joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
        final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO)
                .after(props.joinWindowAfterSize())
                .before(props.joinWindowBeforeSize())
                .grace(props.joinWindowGrace())
                .until(props.joinWindowRetention().toMillis()); // see 
https://issues.apache.org/jira/browse/KAFKA-8315

        final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
        final KStream<Object, GenericRecord> joinStream = 
join.execute(rightStream, joiner, joinWindow)
                .transform(() -> new 
AvroTimestampTransformer(joinedRecordFactory, leftTsExtractor));

        // setup the grouping
        final TimeWindowedKStream<Object, GenericRecord> groupedStream = 
joinStream
                .groupByKey()
                
.windowedBy(TimeWindows.of(props.groupWindowSize()).grace(props.groupWindowGrace()));
        final AvroLastAggregator lastAggregator = 
AvroLastAggregator.create(joinedRecordFactory, leftTsExtractor, 
rightTsExtractor);

        final Materialized<Object, GenericRecord, WindowStore<Bytes, byte[]>> 
materialized =
                Materialized.<Object, GenericRecord, WindowStore<Bytes, 
byte[]>>with(null, null)
                        .withRetention(props.groupWindowRetention());

        final KTable<Windowed<Object>, GenericRecord> groupTable = groupedStream
                .aggregate(lastAggregator, lastAggregator, materialized);

        // write the change-log stream to the topic
        groupTable.toStream((k, v) -> k.key())
                .filter((k, v) -> joinedRecordFactory.isUpdated(v))
                .mapValues(joinValueMapper::apply)
                .to(props.joinTopic());

        return builder.build();
    }

```

> ClassCastException using KTable.suppress()
> ------------------------------------------
>
>                 Key: KAFKA-8317
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8317
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Andrew
>            Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed 
> cannot be cast to java.lang.String
>     at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
>         final KTable<Windowed<Object>, GenericRecord> groupTable = 
> groupedStream
>                 .aggregate(lastAggregator, lastAggregator, materialized);
>         final KTable<Windowed<Object>, GenericRecord> suppressedTable = 
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>         // write the change-log stream to the topic
>         suppressedTable.toStream((k, v) -> k.key())
>                 .mapValues(joinValueMapper::apply)
>                 .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation : 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to