[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-10 Thread Prasanna Ranganathan (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045636#comment-16045636
 ] 

Prasanna Ranganathan commented on STORM-2343:
-

Awesome. Will take a look.

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> ---
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (STORM-2546) Kafka spout can stall / get stuck due to edge case with failing tuples

2017-06-10 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reassigned STORM-2546:
-

Assignee: Stig Rohde Døssing

> Kafka spout can stall / get stuck due to edge case with failing tuples
> --
>
> Key: STORM-2546
> URL: https://issues.apache.org/jira/browse/STORM-2546
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.x
>Reporter: Prasanna Ranganathan
>Assignee: Stig Rohde Døssing
>
> The mechanism for replaying a failed tuple involves seeking the kafka 
> consumer to the failing offset and then re-emitting it into the topology. A 
> tuple, when emitted the first time, will have an entry created in 
> OffsetManager. This entry will be removed only after the tuple is 
> successfully acknowledged and its offset successfully committed. Till then, 
> commits for offsets beyond the failing offset for that TopicPartition will be 
> blocked.
> It is possible that when the spout seeks the consumer to the failing offset, 
> the corresponding kafka message is not returned in the poll response. This 
> can happen due to that offset being deleted or compacted away. In this 
> scenario that partition will be blocked from committing and progressing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2546) Kafka spout can stall / get stuck due to edge case with failing tuples

2017-06-10 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045632#comment-16045632
 ] 

Stig Rohde Døssing commented on STORM-2546:
---

Will look at fixing this once https://github.com/apache/storm/pull/2156 has 
been resolved.

> Kafka spout can stall / get stuck due to edge case with failing tuples
> --
>
> Key: STORM-2546
> URL: https://issues.apache.org/jira/browse/STORM-2546
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.x
>Reporter: Prasanna Ranganathan
>Assignee: Stig Rohde Døssing
>
> The mechanism for replaying a failed tuple involves seeking the kafka 
> consumer to the failing offset and then re-emitting it into the topology. A 
> tuple, when emitted the first time, will have an entry created in 
> OffsetManager. This entry will be removed only after the tuple is 
> successfully acknowledged and its offset successfully committed. Till then, 
> commits for offsets beyond the failing offset for that TopicPartition will be 
> blocked.
> It is possible that when the spout seeks the consumer to the failing offset, 
> the corresponding kafka message is not returned in the poll response. This 
> can happen due to that offset being deleted or compacted away. In this 
> scenario that partition will be blocked from committing and progressing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-10 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045629#comment-16045629
 ] 

Stig Rohde Døssing commented on STORM-2343:
---

[~ranganp] Put up a PR for the proposed fix here 
https://github.com/apache/storm/pull/2156

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> ---
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (STORM-2549) The fix for STORM-2343 is incomplete, and the spout can still get stuck on failed tuples

2017-06-10 Thread JIRA
Stig Rohde Døssing created STORM-2549:
-

 Summary: The fix for STORM-2343 is incomplete, and the spout can 
still get stuck on failed tuples
 Key: STORM-2549
 URL: https://issues.apache.org/jira/browse/STORM-2549
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 2.0.0, 1.1.0
Reporter: Stig Rohde Døssing
Assignee: Stig Rohde Døssing


Example:
Say maxUncommittedOffsets is 10, maxPollRecords is 5, and the committedOffset 
is 0.
The spout will initially emit up to offset 10, because it is allowed to poll 
until numNonRetriableTuples is >= maxUncommittedOffsets
The spout will be allowed to emit another 5 tuples if offset 10 fails, so if 
that happens, offsets 10-14 will get emitted. If offset 1 fails and 2-14 get 
acked, the spout gets stuck because it will count the "extra tuples" 11-14 in 
numNonRetriableTuples.

An similar case is the one where maxPollRecords doesn't divide 
maxUncommittedOffsets evenly. If it were 3 in the example above, the spout 
might just immediately emit offsets 1-12. If 2-12 get acked, offset 1 cannot be 
reemitted.

The proposed solution is the following:
* Enforce maxUncommittedOffsets on a per partition basis (i.e. actual limit 
will be multiplied by the number of partitions) by always allowing poll for 
retriable tuples that are within maxUncommittedOffsets tuples of the committed 
offset. Pause any non-retriable partitions if the partition has passed the 
maxUncommittedOffsets limit, and some other partition is polling for retries 
while also at the maxUncommittedOffsets limit. 

Example of this functionality:
MaxUncommittedOffsets is 100
MaxPollRecords is 10
Committed offset for partition 0 and 1 is 0.
Partition 0 has emitted 0
Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away)
Partition 1, message 99 is retriable
We check that message 99 is within 100 emitted tuples of offset 0 (it is the 
97th tuple after offset 0, so it is)
We do not pause partition 0 because that partition isn't at the 
maxUncommittedOffsets limit.
Seek to offset 99 on partition 1 and poll
We get back offset 99, 101, 103 and potentially 7 new tuples. Say the lowest of 
these is at offset 104.
The spout emits offset 99, filters out 101 and 103 because they were already 
emitted, and emits the 7 new tuples.
If offset 104 (or later) become retriable, they are not retried until the 
committed offset moves. This is because offset 104 is the 101st tuple emitted 
after offset 0, so it isn't allowed to retry until the committed offset moves.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (STORM-2548) Simplify KafkaSpoutConfig

2017-06-10 Thread JIRA
Stig Rohde Døssing created STORM-2548:
-

 Summary: Simplify KafkaSpoutConfig
 Key: STORM-2548
 URL: https://issues.apache.org/jira/browse/STORM-2548
 Project: Apache Storm
  Issue Type: Improvement
Affects Versions: 2.0.0
Reporter: Stig Rohde Døssing
Assignee: Stig Rohde Døssing


Some suggestions for simplifying KafkaSpoutConfig off the mailing list:

* We should not duplicate properties that users would normally set in the 
KafkaConsumer properties map. We should just have a setter (setProp) for 
setting properties in that map. For instance, setGroupId is just duplicating a 
setting that the user should be able to set directly in the consumer properties.

* We should get rid of the key/value deserializer setters. Setting the 
deserializers as classes is something the user can just as well do by using 
setProp. The SerializableDeserializer class should be removed. It is only 
offering extra type safety in the case where the user is defining their own 
deserializer type, and has the opportunity to subclass 
SerializableDeserializer. The setters don't work with the built in Kafka 
deserializers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2359) Revising Message Timeouts

2017-06-10 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045442#comment-16045442
 ] 

Stig Rohde Døssing commented on STORM-2359:
---

I think the reason it would be tough to move timeouts entirely to the ackers is 
that we'd need to figure out how to deal with message loss between the spout 
and acker when the acker sends a timeout message to the spout. The current 
implementation errs on the side of caution by always reemitting if it can't 
positively say that a tuple has been acked. I'm not sure how we could do the 
same when the acker has to notify the spout to reemit after the timeout, 
because that message could be lost.

It might be a good idea as you mention to instead have two timeouts, a short 
one for the acker and a much longer one for the spout. It would probably mean 
that messages where the acker init message is lost will take much longer to 
retry than messages that are lost elsewhere, but it might allow us to keep 
timeout resets out of the spout.

Tuples can be lost if a worker died, but what if there's a network issue? Can't 
messages also be lost then? 

> Revising Message Timeouts
> -
>
> Key: STORM-2359
> URL: https://issues.apache.org/jira/browse/STORM-2359
> Project: Apache Storm
>  Issue Type: Sub-task
>  Components: storm-core
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  
> https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (STORM-2343) New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

2017-06-10 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16045434#comment-16045434
 ] 

Stig Rohde Døssing commented on STORM-2343:
---

I'll take a look at implementing this fix soon. 

> New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets 
> tuples fail at once
> ---
>
> Key: STORM-2343
> URL: https://issues.apache.org/jira/browse/STORM-2343
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Critical
> Fix For: 2.0.0, 1.1.1
>
>  Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> It doesn't look like the spout is respecting maxUncommittedOffsets in all 
> cases. If the underlying consumer returns more records in a call to poll() 
> than maxUncommittedOffsets, they will all be added to waitingToEmit. Since 
> poll may return up to 500 records by default (Kafka 0.10.1.1), this is pretty 
> likely to happen with low maxUncommittedOffsets.
> The spout only checks for tuples to retry if it decides to poll, and it only 
> decides to poll if numUncommittedOffsets < maxUncommittedOffsets. Since 
> maxUncommittedOffsets isn't being respected when retrieving or emitting 
> records, numUncommittedOffsets can be much larger than maxUncommittedOffsets. 
> If more than maxUncommittedOffsets messages fail, this can cause the spout to 
> stop polling entirely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)