[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14502768#comment-14502768 ]
ASF GitHub Bot commented on STORM-643: -------------------------------------- Github user tpiscitell commented on the pull request: https://github.com/apache/storm/pull/405#issuecomment-94448772 @miguno I don't think either of those JIRAs fixed this issue. The problem here is that PartitionManager.fill() will always attempt to fetch any failed tuples first: ``` // Are there failed tuples? If so, fetch those first. if (had_failed) { offset = failed.first(); } else { offset = _emittedToOffset; } ``` However, even with the patches for those two JIRAs, the `failed` list is never pruned. Instead PartitionManager.fill() just update `_emitedToOffset` and returns: ``` } catch (TopicOffsetOutOfRangeException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn("Using new offset: {}", _emittedToOffset); // fetch failed, so don't update the metrics return; } ``` Just to be called again, and the process repeats: ``` public EmitState next(SpoutOutputCollector collector) { if (_waitingToEmit.isEmpty()) { fill(); } ``` > KafkaUtils repeatedly fetches messages whose offset is out of range > ------------------------------------------------------------------- > > Key: STORM-643 > URL: https://issues.apache.org/jira/browse/STORM-643 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 0.9.2-incubating, 0.9.3 > Reporter: Xin Wang > Assignee: Xin Wang > Priority: Minor > > KafkaUtils repeat fetch messages which offset is out of range. > This happened when failed list(SortedSet<Long> failed) is not empty and some > offset in it is OutOfRange. > {code} > [worker-log] > 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > ... > {code} > [FIX] > {code} > storm.kafka.PartitionManager.fill(): > ... > try { > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > } catch (UpdateOffsetException e) { > _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, > _partition.partition, _spoutConfig); > LOG.warn("Using new offset: {}", _emittedToOffset); > // fetch failed, so don't update the metrics > //fix bug: remove this offset from failed list when it is OutOfRange > if (had_failed) { > failed.remove(offset); > } > return; > } > ... > {code} > also: Log "retrying with default start offset time from configuration. > configured start offset time: [-2]" is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)