Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2476#discussion_r158124954
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
---
@@ -89,19 +87,19 @@ public void updateState(List<TridentTuple> tuples,
TridentCollector collector) {
List<Future<RecordMetadata>> futures = new
ArrayList<>(numberOfRecords);
for (TridentTuple tuple : tuples) {
topic = topicSelector.getTopic(tuple);
- Object messageFromTuple =
mapper.getMessageFromTuple(tuple);
- Object keyFromTuple = mapper.getKeyFromTuple(tuple);
+ V messageFromTuple = mapper.getMessageFromTuple(tuple);
+ K keyFromTuple = mapper.getKeyFromTuple(tuple);
if (topic != null) {
if (messageFromTuple != null) {
- Future<RecordMetadata> result = producer.send(new
ProducerRecord(topic, keyFromTuple, messageFromTuple));
+ Future<RecordMetadata> result = producer.send(new
ProducerRecord<K, V>(topic, keyFromTuple, messageFromTuple));
--- End diff --
Pretty sure the `<>` will also work here
---