GitHub user choojoyq opened a pull request:
https://github.com/apache/storm/pull/2726
STORM-3090 - Fix bug when different topics use the same offset for a
partition
In the current implementation of `ZkCoordinator` deleted partition managers
are used as state holders for newly created partition managers. This behavour
was introduced in the scope of
[this](https://issues-test.apache.org/jira/browse/STORM-2296) ticket. However
existing lookup is based on only on partition number.
```
Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
for (Partition id : deletedPartitions) {
deletedManagers.put(id.partition, _managers.remove(id));
}
for (PartitionManager manager : deletedManagers.values()) {
if (manager != null) manager.close();
}
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition
managers: " + newPartitions.toString());
for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(
_connections,
_topologyInstanceId,
_state,
_topoConf,
_spoutConfig,
id,
deletedManagers.get(id.partition));
_managers.put(id, man);
```
Which is definitely incorrect as the same task is able to manage multiple
partitions with the same number but for different topics. In this case all new
partition managers obtain the same offset value from a random deleted partition
manager (as `HashMap` is used). And all fetch requests for the new partition
managers fail with `TopicOffsetOutOfRangeException`. Some of them are recovered
via this logic if assigned offset is smaller than the real one, but other
continue to repetitively fail with offset out of range exception preventing
fetching messages from Kafka.
```
if (offset > _emittedToOffset) {
_lostMessageCount.incrBy(offset - _emittedToOffset);
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
}
```
I assume that state holder lookup should be based both on topic and
partition number.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/choojoyq/storm STORM-3090
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2726.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2726
----
commit 808e6be3edc7f83c80ad2ad4fb0a11091e81cc66
Author: Nikita Gorbachevsky <nikitag@...>
Date: 2018-06-19T14:02:28Z
STORM-3090 - use topic together with partition number during recreation of
partition managers
----
---