GitHub user choojoyq opened a pull request:

    https://github.com/apache/storm/pull/2726

    STORM-3090 - Fix bug when different topics use the same offset for a 
partition

    In the current implementation of `ZkCoordinator` deleted partition managers 
are used as state holders for newly created partition managers. This behavour 
was introduced in the scope of 
[this](https://issues-test.apache.org/jira/browse/STORM-2296) ticket. However 
existing lookup is based on only on partition number.
    
    ```
    Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
    for (Partition id : deletedPartitions) {
        deletedManagers.put(id.partition, _managers.remove(id));
    }
    for (PartitionManager manager : deletedManagers.values()) {
        if (manager != null) manager.close();
    }
    LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition 
managers: " + newPartitions.toString());
    
    for (Partition id : newPartitions) {
        PartitionManager man = new PartitionManager(
            _connections,
            _topologyInstanceId,
            _state,
            _topoConf,
            _spoutConfig,
            id,
            deletedManagers.get(id.partition));
        _managers.put(id, man);
    ```
    Which is definitely incorrect as the same task is able to manage multiple 
partitions with the same number but for different topics. In this case all new 
partition managers obtain the same offset value from a random deleted partition 
manager (as `HashMap` is used). And all fetch requests for the new partition 
managers fail with `TopicOffsetOutOfRangeException`. Some of them are recovered 
via this logic if assigned offset is smaller than the real one, but other 
continue to repetitively fail with offset out of range exception preventing 
fetching messages from Kafka.
    ```
    if (offset > _emittedToOffset) {
        _lostMessageCount.incrBy(offset - _emittedToOffset);
        _emittedToOffset = offset;
        LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
    }
    ```
    I assume that state holder lookup should be based both on topic and 
partition number.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/choojoyq/storm STORM-3090

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2726.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2726
    
----
commit 808e6be3edc7f83c80ad2ad4fb0a11091e81cc66
Author: Nikita Gorbachevsky <nikitag@...>
Date:   2018-06-19T14:02:28Z

    STORM-3090 - use topic together with partition number during recreation of 
partition managers

----


---

Reply via email to