Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/1999#discussion_r106291741
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -383,23 +403,24 @@ private void commitOffsetsForAckedTuples() {
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
- if(!emitted.contains(msgId)) {
- LOG.debug("Received ack for tuple this spout is no longer
tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
- return;
- }
-
- if (!consumerAutoCommitMode) { // Only need to keep track of
acked tuples if commits are not done automatically
- acked.get(msgId.getTopicPartition()).add(msgId);
+ if (!emitted.contains(msgId)) {
+ LOG.debug("Received ack for message [{}], which is associated
with either a null tuple that was never emitted, " +
--- End diff --
@srdo The reason I decided not to add msgId to emitted was because that is
a big hack. We will be setting the internal state of the spout like a message
was emitted, where it really wasn't.
We can probably fix this by creating and setting a boolean field `emitted`
in msgId, which by default is true, but in this particular scenario that we ack
directly, we set it to false.
We cannot log null tuples message in RecordTranslator because that
interface is implemented by the user. This message is internal to the spout,
and should not depend on third party (user) implementations.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---