[
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833060#comment-16833060
]
Andrew commented on KAFKA-8317:
-------------------------------
[~vvcephei] Here is the code snippet
```
private static Topology joinStreamStreamNearest(final JoinerProperties
props) {
final JoinedRecordFactory joinedRecordFactory =
JoinedRecordFactory.create(props.leftTopic().getSchema(),
props.rightTopic().getSchema());
final FieldMapper leftFieldMapper =
FieldMapper.create(props.leftTopic().getFields());
final FieldMapper rightFieldMapper =
FieldMapper.create(props.rightTopic().getFields());
final JoinValueMapper joinValueMapper =
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper,
props.joinSchema());
// extractors
final AvroTimestampExtractor leftTsExtractor =
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
final AvroTimestampExtractor rightTsExtractor =
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());
final StreamsBuilder builder = new StreamsBuilder();
final Consumed<Object, GenericRecord> leftConsumed =
Consumed.with(leftTsExtractor);
final KStream<Object, GenericRecord> leftStream =
AvroMinMaxTimestampTransformer.wrap(
builder.stream(props.leftTopic().getName(), leftConsumed),
props.minStreamTimestamp(), props.maxStreamTimestamp());
final Consumed<Object, GenericRecord> rightConsumed =
Consumed.with(rightTsExtractor);
final KStream<Object, GenericRecord> rightStream =
AvroMinMaxTimestampTransformer.wrap(
builder.stream(props.rightTopic().getName(), rightConsumed),
props.minStreamTimestamp(), props.maxStreamTimestamp());
// setup the join
final ValueJoiner<GenericRecord, GenericRecord, GenericRecord> joiner =
AvroFieldsValueJoiner.create(joinedRecordFactory);
final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO)
.after(props.joinWindowAfterSize())
.before(props.joinWindowBeforeSize())
.grace(props.joinWindowGrace())
.until(props.joinWindowRetention().toMillis()); // see
https://issues.apache.org/jira/browse/KAFKA-8315
final KStreamKStreamJoinFunction join = props.joinType() ==
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
final KStream<Object, GenericRecord> joinStream =
join.execute(rightStream, joiner, joinWindow)
.transform(() -> new
AvroTimestampTransformer(joinedRecordFactory, leftTsExtractor));
// setup the grouping
final TimeWindowedKStream<Object, GenericRecord> groupedStream =
joinStream
.groupByKey()
.windowedBy(TimeWindows.of(props.groupWindowSize()).grace(props.groupWindowGrace()));
final AvroLastAggregator lastAggregator =
AvroLastAggregator.create(joinedRecordFactory, leftTsExtractor,
rightTsExtractor);
final Materialized<Object, GenericRecord, WindowStore<Bytes, byte[]>>
materialized =
Materialized.<Object, GenericRecord, WindowStore<Bytes,
byte[]>>with(null, null)
.withRetention(props.groupWindowRetention());
final KTable<Windowed<Object>, GenericRecord> groupTable = groupedStream
.aggregate(lastAggregator, lastAggregator, materialized);
// write the change-log stream to the topic
groupTable.toStream((k, v) -> k.key())
.filter((k, v) -> joinedRecordFactory.isUpdated(v))
.mapValues(joinValueMapper::apply)
.to(props.joinTopic());
return builder.build();
}
```
> ClassCastException using KTable.suppress()
> ------------------------------------------
>
> Key: KAFKA-8317
> URL: https://issues.apache.org/jira/browse/KAFKA-8317
> Project: Kafka
> Issue Type: Bug
> Reporter: Andrew
> Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed
> cannot be cast to java.lang.String
> at
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
> at
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
> at
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
> final KTable<Windowed<Object>, GenericRecord> groupTable =
> groupedStream
> .aggregate(lastAggregator, lastAggregator, materialized);
> final KTable<Windowed<Object>, GenericRecord> suppressedTable =
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
> // write the change-log stream to the topic
> suppressedTable.toStream((k, v) -> k.key())
> .mapValues(joinValueMapper::apply)
> .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation :
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)