[
https://issues.apache.org/jira/browse/STORM-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stig Rohde Døssing closed STORM-3337.
-------------------------------------
Resolution: Duplicate
Sounds good. I'm going to close this issue as covered by STORM-2691. Please
reopen if it turns out 2.0.0 doesn't resolve the issue for you.
> KafkaTridentSpoutOpaque can lose offset data in Zookeeper
> ---------------------------------------------------------
>
> Key: STORM-3337
> URL: https://issues.apache.org/jira/browse/STORM-3337
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core, storm-kafka-client
> Affects Versions: 1.2.3
> Reporter: Jonathan Munz
> Priority: Major
> Attachments: lost_offsets.log
>
>
> I see this issue happening once a twice a week for a number of topologies I
> have running in production that are reading from Kafka. I was able to
> reproduce it in a more pared down environment using 1 worker with a
> parallelism hint of at least 2 reading from a topic that had 16 partitions.
> The issue is reproducible using less partitions but it occurs less frequently.
> What happens is that while committing offsets for the first transaction after
> a worker crash the partition offset data in Zookeeper is wiped for a subset
> of that worker's partitions. The state is restored after the next batch or
> two is committed and the worker continues as normal, however if the worker
> happens to crash a 2nd time before the data restores itself then it gets lost
> and those partitions reset (in my case to their earliest offsets since I used
> a reset strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.
> I've attached a log file showing what's happening. In this example ZK had
> offset data committed for all partitions for txid=29 before the worker
> crashed. After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15
> lose their child nodes in ZK, the remaining partitions get child nodes
> created for txid=30. The important steps are:
> 1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here:
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146]
> Thread-7 is assigned even numbered partitions 0-14, has _index=0,
> _changedMeta=true, coordinatorMeta=[]
> Thread-19 is assigned odd numbered partitions 1-15, has _index=1,
> _changedMeta=true, coordinatorMeta=[]
> Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter`
> ignores this parameter in `getPartitionsForTask` and returns partition
> assignments for each thread:
>
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253]
> 2. Both threads hit 'Committing transaction' here:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160]
> 3. Thread-19 begins create state for txid=30 for its partitions:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186]
> 4. Thread-7 enters this special condition since it has `_index==0` and
> `_changedMeta==true`
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169]
> 5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`,
> which for the KakfaTridentSpoutEmitter implementation returns an empty list
> since coordinatorMeta was empty for this batch:
>
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241]
> 6. Thread-7 does a list for all the partition nodes for this component in ZK
> and since `validIds` is empty they all pass the check and `removeState` is
> called on each of them for txid=30:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177]
> This is the key part that causes the bug. Thread-7 hasn't started committing
> state for txid=30 for any of its assigned partitions so the calls to
> `removeState` on those partitions won't do anything. However Thread-19 is
> running concurrently so any partitions it has already written state for will
> get deleted here.
> 7. Thread-7 and Thread-19 enter the success handler and call
> `cleanupBefore(txid=29)`:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153]
> For the partitions owned by Thread-19 that got their state removed in #6
> that means that both the nodes for 29 and 30 are deleted and the partitions'
> sates are empty in ZK.
> Couple things that are contributing to this bug. I'm not sure if it's
> expected that `coordinatorMeta` be passed into `emitBatch` as an empty list
> for the first batch after the worker restart. If this contained all the
> partitions assigned across all tasks (which it does on subsequent batches)
> then `validIds` wouldn't trigger for partitions that were owned by other
> tasks and their state wouldn't accidentally get removed. This is exacerbated
> by the fact that the `KafkaTridentSpoutEmitter` returns valid partitions for
> `getPartitionsForTask` even when `getOrderedPartitions` returns empty which
> seems to break expectations.
> One additional safe-guard that might be worth investigation is having
> `cleanupBefore` make sure it wasn't going to leave a node in ZK without any
> children before running.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)