[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14360022#comment-14360022 ] ASF GitHub Bot commented on STORM-643: -- Github user 2new commented on the pull request: https://github.com/apache/storm/pull/405#issuecomment-78835043 +1 > KafkaUtils repeat fetch messages which offset is out of range > - > > Key: STORM-643 > URL: https://issues.apache.org/jira/browse/STORM-643 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 0.9.2-incubating, 0.9.3 >Reporter: Xin Wang >Assignee: Xin Wang >Priority: Minor > > KafkaUtils repeat fetch messages which offset is out of range. > This happened when failed list(SortedSet failed) is not empty and some > offset in it is OutOfRange. > [worker-log] > 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > ... > [FIX] > storm.kafka.PartitionManager.fill(): > ... > try { > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > } catch (UpdateOffsetException e) { >_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, > _partition.partition, _spoutConfig); > LOG.warn("Using new offset: {}", _emittedToOffset); > // fetch failed, so don't update the metrics > //fix bug: remove this offset from failed list when it is OutOfRange > if (had_failed) { > failed.remove(offset); > } > return; > } > ... > also: Log "retrying with default start offset time from configuration. > configured start offset time: [-2]" is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14338155#comment-14338155 ] ASF GitHub Bot commented on STORM-643: -- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/405#issuecomment-76148389 @ptgoetz can you please review this. Thanks. > KafkaUtils repeat fetch messages which offset is out of range > - > > Key: STORM-643 > URL: https://issues.apache.org/jira/browse/STORM-643 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 0.9.2-incubating, 0.9.3 >Reporter: Xin Wang >Assignee: Xin Wang >Priority: Critical > > KafkaUtils repeat fetch messages which offset is out of range. > This happened when failed list(SortedSet failed) is not empty and some > offset in it is OutOfRange. > [worker-log] > 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > ... > [FIX] > storm.kafka.PartitionManager.fill(): > ... > try { > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > } catch (UpdateOffsetException e) { >_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, > _partition.partition, _spoutConfig); > LOG.warn("Using new offset: {}", _emittedToOffset); > // fetch failed, so don't update the metrics > //fix bug: remove this offset from failed list when it is OutOfRange > if (had_failed) { > failed.remove(offset); > } > return; > } > ... > also: Log "retrying with default start offset time from configuration. > configured start offset time: [-2]" is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335965#comment-14335965 ] ASF GitHub Bot commented on STORM-643: -- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/405#issuecomment-75902724 Hi Michael, Thank you for your improvement. It would be better to skip all the outdated failed offsets in one step and I have extended the pull request. Best regards and Thanks, Xin Wang > KafkaUtils repeat fetch messages which offset is out of range > - > > Key: STORM-643 > URL: https://issues.apache.org/jira/browse/STORM-643 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 0.9.2-incubating, 0.9.3 >Reporter: Xin Wang >Assignee: Xin Wang >Priority: Critical > > KafkaUtils repeat fetch messages which offset is out of range. > This happened when failed list(SortedSet failed) is not empty and some > offset in it is OutOfRange. > [worker-log] > 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > ... > [FIX] > storm.kafka.PartitionManager.fill(): > ... > try { > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > } catch (UpdateOffsetException e) { >_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, > _partition.partition, _spoutConfig); > LOG.warn("Using new offset: {}", _emittedToOffset); > // fetch failed, so don't update the metrics > //fix bug: remove this offset from failed list when it is OutOfRange > if (had_failed) { > failed.remove(offset); > } > return; > } > ... > also: Log "retrying with default start offset time from configuration. > configured start offset time: [-2]" is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333413#comment-14333413 ] ASF GitHub Bot commented on STORM-643: -- Github user pershyn commented on the pull request: https://github.com/apache/storm/pull/405#issuecomment-75567762 Hi Xin, Thank you for the good finding and patch. I have applied proposed patch on pure storm-0.9.3 branch, deployed and testet, and indeed it seems to fix the issue by skipping failed offsets. But, in case there are lots of failed messages that have outdated offset -> the spout will slowly ask kafka for each of them and then discarding filed message one at a time. In logs this will look like following: (some log warnings records were added for debugging reasons). Note, that "new offset" doesn't change. ``` 2015-02-23T14:19:30.080+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094410182] 2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Using new offset: 58094849835 2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Removing the failed offset that is out of range: 58094410182 2015-02-23T14:19:30.186+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094410183] 2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Using new offset: 58094849835 2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Removing the failed offset that is out of range: 58094410183 2015-02-23T14:19:30.291+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094411596] 2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Using new offset: 58094849835 2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Removing the failed offset that is out of range: 58094411596 2015-02-23T14:19:30.396+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094411837] ``` So, I have added some logic to skip all the outdated failed offsets in one step, see code snippet below: ``` //fix bug [STORM-643] : remove this offset from failed list when it is OutOfRange if (had_failed) { // For the case of EarliestTime it would be better to discard // all the failed offsets, that are earlier than actual EarliestTime // offset, since they are anyway not there. // These calls to broker API will be then saved. // In case of LatestTime - it is a question, if we still need to try out and // reach those that are failed (they still may be available). // But, by moving to LatestTime we are discarding messages in kafka queue. // Since it is configured so, assume that it is ok for user to loose information // and user cares about newest messages first. // It makes sense not to do exceptions for those that are failed and discard them as well. SortedSet omitted = failed.headSet(_emittedToOffset); // Use tail, since sortedSet maintains its elements in ascending order // Using tailSet will set a 'range' on original implementation // so we couldn't then add objects that are out of range. // For that reason we copy tail into new Set, where range is not set. failed = new TreeSet(failed.tailSet(_emittedToOffset)); LOG.warn("Removing the failed offsets that are out of range: {}", omitted); } ``` So, then outdated offsets are skipped at once, and we save several (hundreds) calls to kafka API: ``` 2015-02-23T15:07:21.913+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [8786892024] 2015-02-23T15:07:21.915+0100 s.k.PartitionManager [WARN] Using new offset: 8789372723 2015-02-23T15:07:21.916+0100 s.k.PartitionManager [WARN] Removing the failed offsets that are out of range: [8786892024, 8786892114, 8786892125, 8786892127, 8786892170, 8786892207, 8786892217, 8786892317, 8786892405, 8786892444, 8786892453, 8786892469, 8786892478, 8786892549, 8786892614, 8786892667, 8786892918, /* ... some omitted ... */ 8786977572, 8786977944, 8786986501, 8786991794, 8786991797, 8786994497, 8787001536] 2015-02-23T15:07:32.759+0100 s.k.ZkCoordinator [INFO] Task [7/8] Refreshing partition manager connections ``` If you are agree with such improvement I would be happy if you extend the pull request with this logic. Best regards and Thanks, Michael > KafkaUtils repeat fetch messages which offset is out of range > - > > Key
[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14300943#comment-14300943 ] ASF GitHub Bot commented on STORM-643: -- GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/405 Update PartitionManager.java fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out of range You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm 0.9.3-branch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #405 commit 9280a948efaf28ab4de019060435d46731abd375 Author: vesense Date: 2015-02-02T06:24:14Z Update PartitionManager.java fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out of range > KafkaUtils repeat fetch messages which offset is out of range > - > > Key: STORM-643 > URL: https://issues.apache.org/jira/browse/STORM-643 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 0.9.2-incubating, 0.9.3 >Reporter: Xin Wang >Assignee: Xin Wang >Priority: Critical > > KafkaUtils repeat fetch messages which offset is out of range. > This happened when failed list(SortedSet failed) is not empty and some > offset in it is OutOfRange. > [worker-log] > 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [20919071816]; retrying with default start offset time > from configuration. configured start offset time: [-2] > 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: > 20996130717 > ... > [FIX] > storm.kafka.PartitionManager.fill(): > ... > try { > msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, > offset); > } catch (UpdateOffsetException e) { >_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, > _partition.partition, _spoutConfig); > LOG.warn("Using new offset: {}", _emittedToOffset); > // fetch failed, so don't update the metrics > //fix bug: remove this offset from failed list when it is OutOfRange > if (had_failed) { > failed.remove(offset); > } > return; > } > ... > also: Log "retrying with default start offset time from configuration. > configured start offset time: [-2]" is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)