Jonathan Munz created STORM-3337:
------------------------------------

             Summary: KafkaTridentSpoutOpaque can lose offset data in Zookeeper
                 Key: STORM-3337
                 URL: https://issues.apache.org/jira/browse/STORM-3337
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-core, storm-kafka-client
    Affects Versions: 1.2.3
            Reporter: Jonathan Munz
         Attachments: lost_offsets.log

I see this issue happening once a twice a week for a number of topologies I 
have running in production that are reading from Kafka. I was able to reproduce 
it in a more pared down environment using 1 worker with a parallelism hint of 
at least 2 reading from a topic that had 16 partitions. The issue is 
reproducible using less partitions but it occurs less frequencies.

What happens is that while committing offsets for the first transaction after a 
worker crash the partition offset data in Zookeeper is wiped for a subset of 
that worker's partitions. The state is restored after the next batch or two is 
committed and the worker continues as normal, however if the worker happens to 
crash a 2nd time before the data restores itself then it gets lost and those 
partitions reset (in my case to their earliest offsets since I used a reset 
strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.

I've attached a log file showing what's happening. In this example ZK had 
offset data committed for all partitions for txid=29 before the worker crashed. 
After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15 lose their 
child nodes in ZK, the remaining partitions get child nodes created for 
txid=30. The important steps are:

1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here: 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146](https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146)
 
Thread-7 is assigned even numbered partitions 0-14, has _index=0, 
_changedMeta=true, coordinatorMeta=[] 
Thread-19 is assigned odd numbered partitions 1-15, has _index=1, 
_changedMeta=true, coordinatorMeta=[]
Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter` ignores 
this parameter in `getPartitionsForTask` and returns partition assignments for 
each thread:
https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253

2. Both threads hit 'Committing transaction' here: 
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160
 
3. Thread-19 begins create state for txid=30 for its partitions:
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186

4. Thread-7 enters this special condition since it has `_index==0` and 
`_changedMeta==true`
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169

5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`, 
which for the KakfaTridentSpoutEmitter implementation returns an empty list 
since coordinatorMeta was empty for this batch:
https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241

6. Thread-7 does a list for all the partition nodes for this component in ZK, 
since `validIds` is empty they all pass the check and `removeState` is called 
on each of them for txid=30:
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177
This is the key part that causes the bug. Thread-7 hasn't started committing 
state for txid=30 for any of its assigned partitions so the calls to 
`removeState` on those partitions won't do anything. However Thread-19 is 
running concurrently so any partitions it has already written state for will 
get deleted here.
 
7. Thread-7 and Thread-19 enter the success handler and call 
`cleanupBefore(txid=29)`:
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153
For the partitions owned by Thread-19 that got their state removed in #6 that 
means that both the nodes for 29 and 30 are deleted and the partitions are empty

Couple things that are contributing to this bug. I'm not sure if it's expected 
that `coordinatorMeta` be passed into `emitBatch` as an empty list for the 
first batch after the worker restart. If this contained all the partitions 
assigned across all tasks (which it does on subsequent batches) then `validIds` 
wouldn't trigger for partitions that were owned by other tasks and their state 
wouldn't accidentally get removed. This is exacerbated by the fact that the 
`KafkaTridentSpoutEmitter` returns valid partitions for `getPartitionsForTask` 
even when `getOrderedPartitions` returns empty which seems to break 
expectations.

One additional safe-guard that might be worth investigation is having 
`cleanupBefore` make sure it wasn't going to leave a node in ZK without any 
children before running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to