Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/1826#discussion_r92921273
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -77,7 +77,7 @@
private KafkaSpoutStreams kafkaSpoutStreams; //
Object that wraps all the logic to declare output fields and emit tuples
private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; //
Object that contains the logic to build tuples for each ConsumerRecord
- private transient Map<TopicPartition, OffsetEntry> acked; //
Tuples that were successfully acked. These tuples will be committed
periodically when the commit timer expires, after consumer rebalance, or on
close/deactivate
+ transient Map<TopicPartition, OffsetEntry> acked; // Tuples
that were successfully acked. These tuples will be committed periodically when
the commit timer expires, after consumer rebalance, or on close/deactivate
--- End diff --
Yes, I agree that we should move OffsetEntry out of KafkaSpout. I also
agree that we should be able to test the spout without peeking directly into
the spout's fields.
Here's what I suggest:
* We move Timer out into a separate class, and use a TimerFactory to create
instances in the KafkaSpout (basically the same solution as we use for
KafkaConsumer mocking on master). That way we can write tests where time is
simulated, which is helpful if we want to test that the spout commits offsets
correctly (or any other periodic action).
* We move and rename OffsetEntry out into a new class. It looks like the
only part of KafkaSpout state it reads is `numUncommittedOffsets` from
`OffsetEntry.commit()`. Since that method returns void, we can solve that
problem very easily by returning `numCommittedOffsets` from there. If we want
to support other OffsetEntry implementations we can just set up a factory for
that too, but I think that can be postponed until we actually need to support
multiple implementations.
* We rewrite the offending tests here to use a mocked KafkaConsumer and
KafkaTimer instead of KafkaUnit, and we make the tests check that the offsets
are committed on the consumer instead of directly reading `KafkaSpout.acked`. I
think KafkaUnit is a good idea for integration tests, but I don't think it fits
that well for the tests in this PR.
I'll try implementing this on master, if it works out I'll open a PR there
first. We can port the improvements to 1.x once we agree on the solution on
master.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---