Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1826#discussion_r92921273
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -77,7 +77,7 @@
         private KafkaSpoutStreams kafkaSpoutStreams;                        // 
Object that wraps all the logic to declare output fields and emit tuples
         private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;      // 
Object that contains the logic to build tuples for each ConsumerRecord
     
    -    private transient Map<TopicPartition, OffsetEntry> acked;           // 
Tuples that were successfully acked. These tuples will be committed 
periodically when the commit timer expires, after consumer rebalance, or on 
close/deactivate
    +    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples 
that were successfully acked. These tuples will be committed periodically when 
the commit timer expires, after consumer rebalance, or on close/deactivate
    --- End diff --
    
    Yes, I agree that we should move OffsetEntry out of KafkaSpout. I also 
agree that we should be able to test the spout without peeking directly into 
the spout's fields.
    
    Here's what I suggest:
    
    * We move Timer out into a separate class, and use a TimerFactory to create 
instances in the KafkaSpout (basically the same solution as we use for 
KafkaConsumer mocking on master). That way we can write tests where time is 
simulated, which is helpful if we want to test that the spout commits offsets 
correctly (or any other periodic action). 
    
    * We move and rename OffsetEntry out into a new class. It looks like the 
only part of KafkaSpout state it reads is `numUncommittedOffsets` from 
`OffsetEntry.commit()`. Since that method returns void, we can solve that 
problem very easily by returning `numCommittedOffsets` from there. If we want 
to support other OffsetEntry implementations we can just set up a factory for 
that too, but I think that can be postponed until we actually need to support 
multiple implementations.
    
    * We rewrite the offending tests here to use a mocked KafkaConsumer and 
KafkaTimer instead of KafkaUnit, and we make the tests check that the offsets 
are committed on the consumer instead of directly reading `KafkaSpout.acked`. I 
think KafkaUnit is a good idea for integration tests, but I don't think it fits 
that well for the tests in this PR.
    
    I'll try implementing this on master, if it works out I'll open a PR there 
first. We can port the improvements to 1.x once we agree on the solution on 
master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to