Robert Joseph Evans created STORM-2228:
------------------------------------------

             Summary: KafkaSpout does not replay properly when a topic maps to 
multiple streams
                 Key: STORM-2228
                 URL: https://issues.apache.org/jira/browse/STORM-2228
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka-client
    Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
            Reporter: Robert Joseph Evans
            Assignee: Robert Joseph Evans
            Priority: Blocker


In the example.

KafkaSpoutTopologyMainNamedTopics.java

The code creates a TuplesBuilder and a KafkaSpoutStreams

{code}
protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
    return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
            new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], 
TOPICS[1]),
            new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
            .build();
}

protected KafkaSpoutStreams getKafkaSpoutStreams() {
    final Fields outputFields = new Fields("topic", "partition", "offset", 
"key", "value");
    final Fields outputFields1 = new Fields("topic", "partition", "offset");
    return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], 
new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to 
test_stream
            .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test_stream
            .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test2_stream
            .build();
}
{code}

Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and 
{{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", 
"key", "value")}} and output them on {{STREAMS\[0]}}. Then just for 
{{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", 
"offset")}} to {{STREAMS\[2]}}.  (Don't know what happened to {{STREAMS\[1]}})

There are two issues here.  First with how the TupleBuilder and the 
SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting the 
full "topic" "partition" "offset" "key" "value", but this minor.  The real 
issue is that the code uses the same KafkaSpoutMessageId for all the tuples 
emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.

https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304

The code, however, is written to assume that it will only ever get one ack/fail 
for a given KafkaSpoutMessageId.  This means that if one of the emitted tuple 
trees succeed and then the other fails, the failure will not result in anything 
being replayed!  This violates how storm is intended to work.

I discovered this as a part of STORM-2225, and I am fine with fixing it on 
STORM-2225 (I would just remove support for that functionality because there 
are other ways of doing this correctly).  But that would not maintain backwards 
compatibility and I am not sure it would be appropriate for 1.x releases.  I 
really would like to have feedback from others on this.

I can put something into 1.x where it will throw an exception if acking is 
enabled and this situation is present, but I don't want to spend the time tying 
to do reference counting on the number of tuples actually emitted.  If someone 
else wants to do that I would be happy to turn this JIRA over to them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to