[ 
https://issues.apache.org/jira/browse/STORM-1363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated STORM-1363:
--------------------------------
    Fix Version/s:     (was: 1.1.1)
                   1.1.0

Change fix version since Storm 1.1.0 RC vote canceled.

> TridentKafkaState should handle null values from 
> TridentTupleToKafkaMapper.getMessageFromTuple()
> ------------------------------------------------------------------------------------------------
>
>                 Key: STORM-1363
>                 URL: https://issues.apache.org/jira/browse/STORM-1363
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka, storm-kafka-client
>    Affects Versions: 0.10.1, 1.x
>            Reporter: Sachin Pasalkar
>            Assignee: Sachin Pasalkar
>             Fix For: 2.0.0, 1.1.0
>
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
> When producer is sending data its not handling if the null value is sent by 
> mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
> "null" string. There might be case in particular kind of exception user do 
> not want to replay tuple and just report it and with that he needs to return 
> null.
> Also make the members as protected as I need to copy-paste the class to 
> provide my implementation.
> My updateState API looks like this
> {code}
> public void updateState(List<TridentTuple> tuples, TridentCollector 
> collector) {
>       String topic = null;
>               for (TridentTuple tuple : tuples) {
>                       if(tuple==null) {
>                               continue;
>                       }
>                       Object keyFromTuple = null;
>                       try {
>                               keyFromTuple = mapper.getKeyFromTuple(tuple);
>                               topic = topicSelector.getTopic(tuple);
>                               Object messageFromTuple = 
> mapper.getMessageFromTuple(tuple);
>                               if (topic != null && messageFromTuple != null) {
>                                       producer.send(new KeyedMessage(topic, 
> keyFromTuple, messageFromTuple));
>                               } else {
>                                       LOG.warn("skipping key = " + 
> keyFromTuple + ", topic selector returned null.");
>                               }
>                       } catch (Exception ex) {
>                               String errorMsg = "Could not send message with 
> key = " + keyFromTuple + " to topic = " + topic;
>                               LOG.warn(errorMsg, ex);
>                               throw new FailedException(errorMsg, ex);
>                       }
>               }
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to