[ https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833060#comment-16833060 ]
Andrew commented on KAFKA-8317: ------------------------------- [~vvcephei] Here is the code snippet ``` private static Topology joinStreamStreamNearest(final JoinerProperties props) { final JoinedRecordFactory joinedRecordFactory = JoinedRecordFactory.create(props.leftTopic().getSchema(), props.rightTopic().getSchema()); final FieldMapper leftFieldMapper = FieldMapper.create(props.leftTopic().getFields()); final FieldMapper rightFieldMapper = FieldMapper.create(props.rightTopic().getFields()); final JoinValueMapper joinValueMapper = JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, props.joinSchema()); // extractors final AvroTimestampExtractor leftTsExtractor = AvroTimestampExtractor.create(props.leftTopic().getTimestampField()); final AvroTimestampExtractor rightTsExtractor = AvroTimestampExtractor.create(props.rightTopic().getTimestampField()); final StreamsBuilder builder = new StreamsBuilder(); final Consumed<Object, GenericRecord> leftConsumed = Consumed.with(leftTsExtractor); final KStream<Object, GenericRecord> leftStream = AvroMinMaxTimestampTransformer.wrap( builder.stream(props.leftTopic().getName(), leftConsumed), props.minStreamTimestamp(), props.maxStreamTimestamp()); final Consumed<Object, GenericRecord> rightConsumed = Consumed.with(rightTsExtractor); final KStream<Object, GenericRecord> rightStream = AvroMinMaxTimestampTransformer.wrap( builder.stream(props.rightTopic().getName(), rightConsumed), props.minStreamTimestamp(), props.maxStreamTimestamp()); // setup the join final ValueJoiner<GenericRecord, GenericRecord, GenericRecord> joiner = AvroFieldsValueJoiner.create(joinedRecordFactory); final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO) .after(props.joinWindowAfterSize()) .before(props.joinWindowBeforeSize()) .grace(props.joinWindowGrace()) .until(props.joinWindowRetention().toMillis()); // see https://issues.apache.org/jira/browse/KAFKA-8315 final KStreamKStreamJoinFunction join = props.joinType() == JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin; final KStream<Object, GenericRecord> joinStream = join.execute(rightStream, joiner, joinWindow) .transform(() -> new AvroTimestampTransformer(joinedRecordFactory, leftTsExtractor)); // setup the grouping final TimeWindowedKStream<Object, GenericRecord> groupedStream = joinStream .groupByKey() .windowedBy(TimeWindows.of(props.groupWindowSize()).grace(props.groupWindowGrace())); final AvroLastAggregator lastAggregator = AvroLastAggregator.create(joinedRecordFactory, leftTsExtractor, rightTsExtractor); final Materialized<Object, GenericRecord, WindowStore<Bytes, byte[]>> materialized = Materialized.<Object, GenericRecord, WindowStore<Bytes, byte[]>>with(null, null) .withRetention(props.groupWindowRetention()); final KTable<Windowed<Object>, GenericRecord> groupTable = groupedStream .aggregate(lastAggregator, lastAggregator, materialized); // write the change-log stream to the topic groupTable.toStream((k, v) -> k.key()) .filter((k, v) -> joinedRecordFactory.isUpdated(v)) .mapValues(joinValueMapper::apply) .to(props.joinTopic()); return builder.build(); } ``` > ClassCastException using KTable.suppress() > ------------------------------------------ > > Key: KAFKA-8317 > URL: https://issues.apache.org/jira/browse/KAFKA-8317 > Project: Kafka > Issue Type: Bug > Reporter: Andrew > Priority: Major > > I am trying to use `KTable.suppress()` and I am getting the following error : > {Code} > java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed > cannot be cast to java.lang.String > at > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) > {Code} > My code is as follows : > {Code} > final KTable<Windowed<Object>, GenericRecord> groupTable = > groupedStream > .aggregate(lastAggregator, lastAggregator, materialized); > final KTable<Windowed<Object>, GenericRecord> suppressedTable = > groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); > // write the change-log stream to the topic > suppressedTable.toStream((k, v) -> k.key()) > .mapValues(joinValueMapper::apply) > .to(props.joinTopic()); > {Code} > The code without using `suppressedTable` works... what am i doing wrong. > Someone else has encountered the same issue : > https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380 > Slack conversation : > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800 -- This message was sent by Atlassian JIRA (v7.6.3#76005)