[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xin Wang updated STORM-643: --------------------------- Priority: Minor (was: Critical) > KafkaUtils repeat fetch messages which offset is out of range > ------------------------------------------------------------- > > Key: STORM-643 > URL: https://issues.apache.org/jira/browse/STORM-643 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 0.9.2-incubating, 0.9.3 > Reporter: Xin Wang > Assignee: Xin Wang > Priority: Minor > > KafkaUtils repeat fetch messages which offset is out of range. > This happened when failed list(SortedSet<Long> failed) is not empty and some > offset in it is OutOfRange. > [worker-log] > 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > ... > [FIX] > storm.kafka.PartitionManager.fill(): > ... > try { > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > } catch (UpdateOffsetException e) { > _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, > _partition.partition, _spoutConfig); > LOG.warn("Using new offset: {}", _emittedToOffset); > // fetch failed, so don't update the metrics > //fix bug: remove this offset from failed list when it is OutOfRange > if (had_failed) { > failed.remove(offset); > } > return; > } > ... > also: Log "retrying with default start offset time from configuration. > configured start offset time: [-2]" is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)