[ https://issues.apache.org/jira/browse/STORM-1363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim updated STORM-1363: -------------------------------- Fix Version/s: (was: 1.1.1) 1.1.0 Change fix version since Storm 1.1.0 RC vote canceled. > TridentKafkaState should handle null values from > TridentTupleToKafkaMapper.getMessageFromTuple() > ------------------------------------------------------------------------------------------------ > > Key: STORM-1363 > URL: https://issues.apache.org/jira/browse/STORM-1363 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka, storm-kafka-client > Affects Versions: 0.10.1, 1.x > Reporter: Sachin Pasalkar > Assignee: Sachin Pasalkar > Fix For: 2.0.0, 1.1.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > If you look at the updateState API of storm.kafka.trident.TridentKafkaState. > When producer is sending data its not handling if the null value is sent by > mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as > "null" string. There might be case in particular kind of exception user do > not want to replay tuple and just report it and with that he needs to return > null. > Also make the members as protected as I need to copy-paste the class to > provide my implementation. > My updateState API looks like this > {code} > public void updateState(List<TridentTuple> tuples, TridentCollector > collector) { > String topic = null; > for (TridentTuple tuple : tuples) { > if(tuple==null) { > continue; > } > Object keyFromTuple = null; > try { > keyFromTuple = mapper.getKeyFromTuple(tuple); > topic = topicSelector.getTopic(tuple); > Object messageFromTuple = > mapper.getMessageFromTuple(tuple); > if (topic != null && messageFromTuple != null) { > producer.send(new KeyedMessage(topic, > keyFromTuple, messageFromTuple)); > } else { > LOG.warn("skipping key = " + > keyFromTuple + ", topic selector returned null."); > } > } catch (Exception ex) { > String errorMsg = "Could not send message with > key = " + keyFromTuple + " to topic = " + topic; > LOG.warn(errorMsg, ex); > throw new FailedException(errorMsg, ex); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)