[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range

2015-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14360022#comment-14360022
 ] 

ASF GitHub Bot commented on STORM-643:
--

Github user 2new commented on the pull request:

https://github.com/apache/storm/pull/405#issuecomment-78835043
  
+1


> KafkaUtils repeat fetch messages which offset is out of range
> -
>
> Key: STORM-643
> URL: https://issues.apache.org/jira/browse/STORM-643
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 0.9.2-incubating, 0.9.3
>Reporter: Xin Wang
>Assignee: Xin Wang
>Priority: Minor
>
> KafkaUtils repeat fetch messages which offset is out of range.
> This happened when failed list(SortedSet failed) is not empty and some 
> offset in it is OutOfRange.
> [worker-log]
> 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> ...
> [FIX]
> storm.kafka.PartitionManager.fill():
> ...
> try {
>   msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, 
> offset);
> } catch (UpdateOffsetException e) {
>_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, 
> _partition.partition, _spoutConfig);
>   LOG.warn("Using new offset: {}", _emittedToOffset);
>   // fetch failed, so don't update the metrics
>   //fix bug: remove this offset from failed list when it is OutOfRange
>   if (had_failed) {
>   failed.remove(offset);
>   }
> return;
> }
> ...
> also: Log "retrying with default start offset time from configuration. 
> configured start offset time: [-2]" is incorrect.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range

2015-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14338155#comment-14338155
 ] 

ASF GitHub Bot commented on STORM-643:
--

Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/405#issuecomment-76148389
  
@ptgoetz can you please review this. Thanks.


> KafkaUtils repeat fetch messages which offset is out of range
> -
>
> Key: STORM-643
> URL: https://issues.apache.org/jira/browse/STORM-643
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 0.9.2-incubating, 0.9.3
>Reporter: Xin Wang
>Assignee: Xin Wang
>Priority: Critical
>
> KafkaUtils repeat fetch messages which offset is out of range.
> This happened when failed list(SortedSet failed) is not empty and some 
> offset in it is OutOfRange.
> [worker-log]
> 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> ...
> [FIX]
> storm.kafka.PartitionManager.fill():
> ...
> try {
>   msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, 
> offset);
> } catch (UpdateOffsetException e) {
>_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, 
> _partition.partition, _spoutConfig);
>   LOG.warn("Using new offset: {}", _emittedToOffset);
>   // fetch failed, so don't update the metrics
>   //fix bug: remove this offset from failed list when it is OutOfRange
>   if (had_failed) {
>   failed.remove(offset);
>   }
> return;
> }
> ...
> also: Log "retrying with default start offset time from configuration. 
> configured start offset time: [-2]" is incorrect.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range

2015-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335965#comment-14335965
 ] 

ASF GitHub Bot commented on STORM-643:
--

Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/405#issuecomment-75902724
  
Hi Michael,

Thank you for your improvement.

It would be better to skip all the outdated failed offsets in one step and 
I have extended the pull request.

Best regards and Thanks,
Xin Wang


> KafkaUtils repeat fetch messages which offset is out of range
> -
>
> Key: STORM-643
> URL: https://issues.apache.org/jira/browse/STORM-643
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 0.9.2-incubating, 0.9.3
>Reporter: Xin Wang
>Assignee: Xin Wang
>Priority: Critical
>
> KafkaUtils repeat fetch messages which offset is out of range.
> This happened when failed list(SortedSet failed) is not empty and some 
> offset in it is OutOfRange.
> [worker-log]
> 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> ...
> [FIX]
> storm.kafka.PartitionManager.fill():
> ...
> try {
>   msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, 
> offset);
> } catch (UpdateOffsetException e) {
>_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, 
> _partition.partition, _spoutConfig);
>   LOG.warn("Using new offset: {}", _emittedToOffset);
>   // fetch failed, so don't update the metrics
>   //fix bug: remove this offset from failed list when it is OutOfRange
>   if (had_failed) {
>   failed.remove(offset);
>   }
> return;
> }
> ...
> also: Log "retrying with default start offset time from configuration. 
> configured start offset time: [-2]" is incorrect.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range

2015-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333413#comment-14333413
 ] 

ASF GitHub Bot commented on STORM-643:
--

Github user pershyn commented on the pull request:

https://github.com/apache/storm/pull/405#issuecomment-75567762
  
Hi Xin,

Thank you for the good finding and patch.

I have applied proposed patch on pure storm-0.9.3 branch, deployed and 
testet, and indeed it seems to fix the issue by skipping failed offsets. 
But, in case there are lots of failed messages that have outdated offset -> 
the spout will slowly ask kafka for each of them and then discarding filed 
message one at a time.

In logs this will look like following: (some log warnings records were 
added for debugging reasons).
Note, that "new offset" doesn't change.

```
2015-02-23T14:19:30.080+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094410182]
2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Using new offset: 
58094849835
2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Removing the 
failed offset that is out of range: 58094410182
2015-02-23T14:19:30.186+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094410183]
2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Using new offset: 
58094849835
2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Removing the 
failed offset that is out of range: 58094410183
2015-02-23T14:19:30.291+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094411596]
2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Using new offset: 
58094849835
2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Removing the 
failed offset that is out of range: 58094411596
2015-02-23T14:19:30.396+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094411837]
```

So, I have added some logic to skip all the outdated failed offsets in one 
step, see code snippet below:

```
//fix bug [STORM-643] : remove this offset from failed list 
when it is OutOfRange
if (had_failed) {
// For the case of EarliestTime it would be better to 
discard
// all the failed offsets, that are earlier than actual 
EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.

// In case of LatestTime - it is a question, if we still 
need to try out and
// reach those that are failed (they still may be 
available).
// But, by moving to LatestTime we are discarding messages 
in kafka queue.
// Since it is configured so, assume that it is ok for user 
to loose information
// and user cares about newest messages first.
// It makes sense not to do exceptions for those that are 
failed and discard them as well.

SortedSet omitted = failed.headSet(_emittedToOffset);

// Use tail, since sortedSet maintains its elements in 
ascending order
// Using tailSet will set a 'range' on original 
implementation
// so we couldn't then add objects that are out of range.
// For that reason we copy tail into new Set, where range 
is not set.
failed = new 
TreeSet(failed.tailSet(_emittedToOffset));
LOG.warn("Removing the failed offsets that are out of 
range: {}", omitted);
}
```

So, then outdated offsets are skipped at once, and we save several 
(hundreds) calls to kafka API:

```
2015-02-23T15:07:21.913+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [8786892024]
2015-02-23T15:07:21.915+0100 s.k.PartitionManager [WARN] Using new offset: 
8789372723
2015-02-23T15:07:21.916+0100 s.k.PartitionManager [WARN] Removing the 
failed offsets that are out of range: [8786892024, 8786892114, 8786892125, 
8786892127, 8786892170, 8786892207, 8786892217, 8786892317, 8786892405, 
8786892444, 8786892453, 8786892469, 8786892478, 8786892549, 8786892614, 
8786892667, 8786892918, /* ... some omitted ... */ 8786977572, 8786977944, 
8786986501, 8786991794, 8786991797, 8786994497, 8787001536]
2015-02-23T15:07:32.759+0100 s.k.ZkCoordinator [INFO] Task [7/8] Refreshing 
partition manager connections
```

If you are agree with such improvement I would be happy if you extend the 
pull request with this logic.

Best regards and Thanks,
Michael


> KafkaUtils repeat fetch messages which offset is out of range
> -
>
> Key

[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range

2015-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14300943#comment-14300943
 ] 

ASF GitHub Bot commented on STORM-643:
--

GitHub user vesense opened a pull request:

https://github.com/apache/storm/pull/405

Update PartitionManager.java

fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out
of range

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vesense/storm 0.9.3-branch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #405


commit 9280a948efaf28ab4de019060435d46731abd375
Author: vesense 
Date:   2015-02-02T06:24:14Z

Update PartitionManager.java

fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out
of range




> KafkaUtils repeat fetch messages which offset is out of range
> -
>
> Key: STORM-643
> URL: https://issues.apache.org/jira/browse/STORM-643
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 0.9.2-incubating, 0.9.3
>Reporter: Xin Wang
>Assignee: Xin Wang
>Priority: Critical
>
> KafkaUtils repeat fetch messages which offset is out of range.
> This happened when failed list(SortedSet failed) is not empty and some 
> offset in it is OutOfRange.
> [worker-log]
> 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with 
> offset out of range: [20919071816]; retrying with default start offset time 
> from configuration. configured start offset time: [-2]
> 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 
> 20996130717
> ...
> [FIX]
> storm.kafka.PartitionManager.fill():
> ...
> try {
>   msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, 
> offset);
> } catch (UpdateOffsetException e) {
>_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, 
> _partition.partition, _spoutConfig);
>   LOG.warn("Using new offset: {}", _emittedToOffset);
>   // fetch failed, so don't update the metrics
>   //fix bug: remove this offset from failed list when it is OutOfRange
>   if (had_failed) {
>   failed.remove(offset);
>   }
> return;
> }
> ...
> also: Log "retrying with default start offset time from configuration. 
> configured start offset time: [-2]" is incorrect.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)