[ https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712696#comment-15712696 ]
P. Taylor Goetz commented on STORM-2228: ---------------------------------------- [~revans2] I agree that this is serious and needs to be fixed. I think some degree of backward incompatibility is warranted. > KafkaSpout does not replay properly when a topic maps to multiple streams > ------------------------------------------------------------------------- > > Key: STORM-2228 > URL: https://issues.apache.org/jira/browse/STORM-2228 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3 > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Priority: Blocker > > In the example. > KafkaSpoutTopologyMainNamedTopics.java > The code creates a TuplesBuilder and a KafkaSpoutStreams > {code} > protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { > return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( > new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], > TOPICS[1]), > new TopicTest2TupleBuilder<String, String>(TOPICS[2])) > .build(); > } > protected KafkaSpoutStreams getKafkaSpoutStreams() { > final Fields outputFields = new Fields("topic", "partition", "offset", > "key", "value"); > final Fields outputFields1 = new Fields("topic", "partition", "offset"); > return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], > new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent > to test_stream > .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // > contents of topic test2 sent to test_stream > .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) > // contents of topic test2 sent to test2_stream > .build(); > } > {code} > Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and > {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", > "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for > {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", > "offset")}} to {{STREAMS\[2]}}. (Don't know what happened to {{STREAMS\[1]}}) > There are two issues here. First with how the TupleBuilder and the > SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting > the full "topic" "partition" "offset" "key" "value", but this minor. The > real issue is that the code uses the same KafkaSpoutMessageId for all the > tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}. > https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304 > The code, however, is written to assume that it will only ever get one > ack/fail for a given KafkaSpoutMessageId. This means that if one of the > emitted tuple trees succeed and then the other fails, the failure will not > result in anything being replayed! This violates how storm is intended to > work. > I discovered this as a part of STORM-2225, and I am fine with fixing it on > STORM-2225 (I would just remove support for that functionality because there > are other ways of doing this correctly). But that would not maintain > backwards compatibility and I am not sure it would be appropriate for 1.x > releases. I really would like to have feedback from others on this. > I can put something into 1.x where it will throw an exception if acking is > enabled and this situation is present, but I don't want to spend the time > tying to do reference counting on the number of tuples actually emitted. If > someone else wants to do that I would be happy to turn this JIRA over to them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)