[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-05-01 Thread harshach
Github user harshach commented on the issue:

https://github.com/apache/storm/pull/1924
  
Thanks @srdo for your patience. Merged into 1.x & master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-04-07 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
Squashed and rebased


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-04-05 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@pavankumarp89 It should probably go in 1.2.0 or something like that. I 
definitely want it backported to 1.x, but I think we're trying to avoid API 
breakage in patch versions, and this PR changes the RetryService API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-04-05 Thread pavankumarp89
Github user pavankumarp89 commented on the issue:

https://github.com/apache/storm/pull/1924
  
Can you please let us know if this fix will be applied to lower version 
1.0.4 or 1.0.3


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-04-03 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@HeartSaVioR @hmcl Do you guys have time to take another look? (again, very 
sorry about the number of times I've declared this "done")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-24 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
Very sorry about this, but this is still broken :( 

In the current implementation (pre this PR), the spout can be prevented 
from polling if there are more than maxUncommittedOffsets failed tuples. This 
is because the check for whether to poll isn't accounting for retriable tuples 
already being counted in the numUncommittedOffsets count.

The current fix is to allow failed tuples to retry always, even if 
maxUncommittedOffsets is exceeded, because failed tuples don't contribute 
further to the numUncommittedOffsets count. The problem has been to ensure that 
we don't also emit a bunch of new tuples when polling for retriable tuples, and 
end up ignoring maxUncommittedOffsets entirely. This was partially fixed by 
pausing partitions that have no retriable tuples.

With the previous seeking behavior, this was a complete fix, since 
maxPollRecords put a bound on how far the spout could read from the commit 
offset in a poll that was ignoring maxUncommittedOffsets. With the new seeking 
behavior this is broken. If the spout has emitted as many tuples as allowed, 
and the last (highest offset) tuple fails, the spout may now poll for a full 
batch of new tuples, starting from the failed tuple. This scenario can repeat 
arbitrarily many times, so maxUncommittedOffsets is completely ineffective. 

We don't want to go back to the old seeking behavior (IMO), because it 
meant that in the case where maxPollRecords is much lower than 
maxUncommittedOffsets (almost always), the spout might end up choking on failed 
tuples. For example, if maxPollRecords is 5, and tuple 0-4 are not ready for 
retry (they might have been retried already, and are now waiting for retry 
backoff), but tuple 5-9 are, the spout is unable to retry 5-9 (or anything else 
on that partition) because it keeps seeking back to 0, and polling out the 
first 5 tuples. Seeking directly to the retriable tuples should in most cases 
be more efficient as well, because in the old implementation we'd just be 
seeking to the last committed offset, polling, and discarding tuples until we 
reach the ones that can be retried.

We could probably fix the broken behavior by trying really hard not to emit 
new tuples when we're ignoring maxUncommittedOffsets, but that seems like it 
would be error prone and complicated to implement.

I think we might be able to fix this by ensuring that we don't 
"doublecount" retriable tuples. When the spout is deciding whether to poll, it 
should deduct retriable tuples from numUncommittedOffsets when comparing to 
maxUncommittedOffsets.

Changing the poll check in this way is the same as enforcing the following 
constraint per partition, it seems to me:
* Poll only if `numNonRetriableEmittedTuples < maxUncommittedOffsets`. If 
there are more nonretriable tuples than that, the poll won't be allowed because 
`numUncommittedOffsets = numRetriableTuples + numNonRetriableEmittedTuples`, so 
`numUncommittedOffsets - numRetriableTuples >= maxUncommittedOffsets`. 

This should mean that the limit on uncommitted tuples on each partition is 
going to be `maxUncommittedOffsets + maxPollRecords - 1`, because the latest 
tuple that can be retried on a partition is the one at offset 
`maxUncommittedOffsets`, where there are `maxUncommittedOffsets - 1` 
uncommitted tuples "to the left". If the retry poll starts at that offset, it 
at most emits the retried tuple plus `maxPollRecords - 1` new tuples.

There shouldn't be any problems when multiple partitions have retriable 
tuples, where retriable tuples on one partition might be able to cause a 
different partition to break the uncommitted offset limit. This is because a 
partition will at minimum contribute 0 to numUncommittedOffsets (e.g. if all 
uncommitted tuples on that partition are retriable), because any retriable 
tuples being subtracted were already counted in numUncommittedOffsets when the 
tuples were originally emitted.

If we can enforce the limit on a per partition basis this way, there's no 
reason to worry about only emitting retriable tuples when we're exceeding 
maxUncommittedOffsets. 

I don't think there's a need for pausing partitions anymore either. It was 
meant to prevent polling for new tuples when there were retriable tuples, but 
we're no longer trying to prevent that, since the per partition cap is already 
ensuring we won't emit too many tuples. Pausing in this case would prioritize 
retriable tuples over new tuples (e.g. in the case where an unpaused consumer 
might choose to fetch from a nonretriable partition even though there are 
retriable tuples), but might lead to lower throughput overall (in the case 
where there are not enough messages on the retriable partitions to fill a 
batch). I've removed it again.

I've put up what I hope is the fix both here and on the 1

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1924
  
@srdo 
Thanks for your patience. +1

I can squash this and merge, but seems like #2004 depends on this so it 
would be clearer if you squash the commits for this PR (and for 1.x branch 
#2022 too), and rebase #2004 after merging this.

Let me know when you finish squashing. Thanks again!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-22 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
+1. 

@srdo I forgot to say in the last review. Can you please squash the 
commits. Once that is done as I am concerned, this patch is ready to merge. 

@HeartSaVioR can you please do your last pass? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-22 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
Thanks for the reviews guys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-21 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
@srdo @HeartSaVioR I will do the last pass on this first thing in the 
morning such that we can merge this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-21 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@HeartSaVioR I think as long as the spout can seek to the tuples it needs 
to retry instead of seeking to the committed offset, all of the minor issues I 
could think of are resolved. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-21 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl 
I think @srdo addressed your last review comments. Could you finish the 
review?
Please let me know if there're some points we need to discuss or resolve, 
so that we can address them now, or file new issues and postpone to next 
release if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-20 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
Sorry to keep changing this. I hope these are the last few changes. The 
spout should now seek to the lowest retriable offset for each partition when 
retrying tuples, instead of seeking back to the last committed offset. This 
means there's not as far as I can see any reason to worry about maxPollRecords 
being set too low compared to maxUncommittedOffsets, so I've reverted that part 
of KafkaSpoutConfig.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-14 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl Thanks for the explanation. It makes more sense now.

I don't think we can meaningfully handle the case where the user has 
specified no cap on retries and one or more tuples keep failing.

In the situation you describe, `maxUncommittedOffsets` isn't really solving 
the problem. If offset 3 fails and all other tuples can be acked, the spout 
will emit another `maxUncommittedOffsets` tuples on the same partition as 3, 
and will then stop emitting because it can't commit any tuples until 3 is 
acked. At this point, only offset 3 will get re-emitted until it hopefully 
succeeds at some point, so having a continually failing tuple (or just one that 
fails many times) will still "clog" the spout.

If `maxUncommittedOffsets` is removed, you're right that the `acked` map 
size is technically uncapped, but given the amount of information we store, and 
how the spout handles retries, I think it is a non-issue. Just to illustrate, 
say we remove `maxUncommittedOffsets`, and let's say again that offset 3 is 
failing a large number of times, and the last acked offset is far beyond it, 
e.g. 500. When 3 is ready for retry, the consumer is reset to that position 
on the relevant partition. This means it fetches and emits message 3. It will 
also have to fetch 4...500 (potentially over many calls to `poll()`), 
because we never seek forward to where we left off). The spout therefore has to 
spend time fetching and discarding all tuples up to 500 before it can 
finally emit another tuple. It seems likely that it'll hit the point where it 
is spending all the time fetching and discarding acked tuples earlier than 
it'll run out of memory to store their offsets.

Disregarding the issue with consumer seeking, because my reasoning relies 
on an implementation detail of how we do retries, I'm still not sure 
`maxUncommittedOffsets` is allowing higher throughput compared to not having 
it. If we allow the spout to fail with an `OutOfMemoryError`, the spout will 
have had higher throughput up to the crash than if it were being throttled by 
`maxUncommittedOffsets` (because otherwise it would also have had an OOME in 
that case). It really seems to me like all `maxUncommittedOffsets` is doing is 
trading having the spout potentially cause an OOME due to `acked` size, in 
exchange for making the spout react to the same situation by not emitting any 
more tuples. I'm not sure that is better, because data flow stops in either 
case. 

`maxUncommittedOffsets` could have some value in a system where the 
`KafkaSpout` is a secondary stream source and the messages coming out of it 
aren't time sensitive. In that case it might be fine to let the `KafkaSpout` 
stop emitting tuples for a while if some tuples temporarily can't be acked, but 
having it cause an OOME would be too disruptive because the primary spout could 
be doing fine. I can't come up with a concrete example of this kind of 
configuration though.

I'd be fine with keeping `maxUncommittedOffsets` for that kind of situation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-14 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
@srdo I am reading through your detailed explanation again and will let you 
know my opinion concerning the need, or no need, for the parameter 
`maxUncommittedOffsets`, based on all the facts you state. 

Prior to that, however, I will share right away what is the rationale 
behind `maxUncommittedOffsets` and why I think we still need it. 

`maxUncommittedOffsets` is unrelated to 
`ConsumerConfig#MAX_POLL_RECORDS_CONFIG` and to 
`Config#TOPOLOGY_MAX_SPOUT_PENDING`. It's purpose is to allow the `KafkaSpout` 
to keep polling/fetching records from Kafka while imposing a (memory 
consumption) limit on how many offsets can be **polled and non committed over 
ANY ARBITRARY NUMBER of polls to Kafka (not just one poll)**, before the 
KafkaSpout stops polling. 

This limit is necessary because if e.g. tuple for offset **3** keeps 
failing, and all subsequent offsets **(4, 5, 6, ... 1M, ... 1B, ...)** are 
acked, the spout won't be able to commit to Kafka any offset greater than 3, 
unless it reaches **maxNumberOfRetrials**. If there is no limit on 
**maxNumberOfRetrials** (in order to guarantee at least once delivery), and 
offset 3 keeps failing forever, the spout won't ever be able to commit any 
offset > 3. In the limit this would cause the `Map acked` to grow to infinite size (despite it's small footprint, 
which is constituted of mainly offsets and some bookkeeping small objects - it 
doesn't have `ConsumerRecord`s). 

In order to put a cap on the size of this map, `maxUncommittedOffset` was 
created to stop polling if this number is too high, and avoid 
`OutOfMemoryError`. **The purpose of the KafkaSpout keeping on polling despite 
a particular offset (tuple) keeping on failing is to increase the overall 
throughput.** If at some point the offset 3 is successfully acked, the max 
subset of continuous offsets acked would be committed to Kafka in the next 
`Timer commitTimer` cycle, and the size of `Map 
acked` would be reduced (potentially by a lot). The spout could then keep doing 
its work and the overall throughput would be much higher.

`maxUncommittedOffsets` is not a hard limit parameter, and I don't think 
that we have to enforce that the KafkaSpout never exceeds this number. It is OK 
to exceed it as long as the JVM doesn't throw `OutOfMemoryError`. Currently, 
and prior to these proposed changes, in the worse case 
**maxUncommittedOffsets** is upper bounded by  **maxUncommittedOffsets + 
maxFetchRecords - 1**, which is OK as long as there is memory. 

Perhaps the name `maxUncommittedOffsets` is not the most clear name. We can 
either change the name (which can cause background compatibility issues and/or 
confusion), or document properly what this parameter really means. The bottom 
line is that I think that we need a parameter doing the job that 
`maxUncommittedOffsets` is currently doing, which I don't think can be 
accomplished with a combination of`ConsumerConfig#MAX_POLL_RECORDS_CONFIG` 
and/or `Config#TOPOLOGY_MAX_SPOUT_PENDING`

The name `maxUncommittedOffsets` should more precisely be something like 
`allowPollingIflLessThanMaxUncommittedOffsets`

Please let me know your thoughts. I will still go over your explanation 
again in the meantime.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-14 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl @HeartSaVioR I think if we can't think of a good use case for 
maxUncommittedOffsets, we'd be better off removing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-14 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
@HeartSaVioR @srdo reviewing this right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl 
I guess you're busy, but could you continue reviewing this one, or please 
let me know if you are not available for reviewing soon? I'd like to get this 
merged sooner since this is necessary for 1.1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1924
  
+1
Once @hmcl finished the review and no outstanding comments there I'll merge.

Btw, regarding maxUncommittedOffsets, yes it provides more guarantees but 
if we guarantee that commit occurs in time periodically (we're providing option 
too), IMHO that's enough.

The thing I have in mind is that commit in nextTuple() doesn't strictly 
guarantee commit in time. Two situations come in mind: reached max spout 
pending, backpressure in place.

If spout reached max spout pending, since we are emitting **at most a 
tuple** for nextTuple() so when tuple tree for a tuple is complete or tuple 
fails, nextTuple() can be called. At least nextTuple() will be called after 
tuple timeout (actually up to 2 * tuple timeout due to underlying 
implementation).
So if users can tolerate this, it might be OK, but could be far from 
configured value.

If backpressure is in place, nextTuple() will never be called. We can't 
guarantee any time frame here.

If we want to strictly guarantee that commit occurs in time, it should be 
apart from spout's lifecycle, like timer. It might also introduce some handling 
of thread-safety so a bit more complicated, but IMO simpler than respecting 
maxUncommittedOffsets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-07 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@HeartSaVioR I agree, I think it would be much simpler to rely on 
maxSpoutPending. maxUncommittedOffsets does have a slightly different meaning 
though. With only maxSpoutPending, the spout might progress arbitrarily far 
past the last committed offset, since there's only a limit on how many tuples 
are in-flight at once, rather than on how many tuples have been emitted past 
the committed offset. If a user needs to cap how far the spout can read ahead 
for some reason, they can't do that with maxSpoutPending.

I can't tell if that's a real need anyone has though. @hmcl any opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
@srdo apologies for a little delay. Had a lot of deadlines for last week. I 
will finish reviewing this today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-03-02 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl I made the changes mentioned in my last comment. 

As far as I can tell there's no way to get the default max.poll.records out 
of Kafka, so for now I've made do with adding comments in KafkaSpoutConfig 
recommending that maxUncommittedOffsets and maxPollRecords should be equal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-28 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
I think it would be good to get some of the assumptions I've made down in 
writing, so I just wrote down a more thorough explanation of the issues I think 
this PR should fix, and what changes I think it should make:

The intention of this PR is to fix an issue where the spout has emitted 
maxUncommittedOffsets (or more) tuples, and some of them fail, and the spout 
then hangs because the `numUncommittedOffsets < maxUncommittedOffsets` check 
when deciding whether to poll doesn't account for retriable tuples. Blindly 
polling when there are retriable tuples will fix this issue, but leads to us 
not being able to bound the number of offsets the consumer has read past the 
last committed offset.

For an example of a case where we can't put a bound on uncommitted offsets, 
say that partition 0 has maxUncommittedOffsets + maxPollRecords emitted tuples 
past the commit offset, and partition 1 has no emitted tuples (or any number 
below maxUncommittedOffsets, it doesn't matter).

When tuples fail and become retriable on partition 0, the spout would 
blindly poll for more tuples. If it gets tuples from partition 0 it's okay, 
since it seeks back to the committed offset for that partition, so it won't go 
beyond committedOffset + maxPollRecords (since the retry logic seeks back to 
the committed offset on every poll, the spout can't go more than maxPollRecords 
beyond that offset). If it gets tuples from partition 1 we're effectively just 
emitting new tuples while ignoring maxUncommittedOffsets. Since we aren't 
controlling which partitions the polled tuples may come from, we can't say 
anything meaningful about a cap on uncommitted offsets. While I think it would 
probably work out to being capped anyway, due to the consumer consuming 
partitions round robin (AFAIK), I'd prefer if the spout implementation doesn't 
make assumptions beyond those guaranteed by the consumer API (in which case we 
should assume any call to `KafkaConsumer.poll` could return messages for an
 y assigned non-paused partition).

So the way to fix that issue is to ensure that if we're polling for 
retriable tuples, we only poll on those partitions that have retriable tuples 
(i.e. we pause the others when doing `doSeekRetriableTopicPartitions`, then 
resume all assigned after calling `KafkaConsumer.poll`). Pausing the other 
partitions should only affect that poll cycle, since once retriable tuples get 
emitted they're no longer retriable, and we won't hit 
`doSeekRetriableTopicPartitions` for those tuples again right away. In most 
polls (no failed tuples, or tuples are failed but not ready for retry), we 
won't hit the pausing case.

If we pause partitions with no retriable tuples when polling on partitions 
with retriable tuples, we should be able to guarantee that any partition never 
gets more than maxUncommittedOffsets + maxPollRecords - 1 past the last 
committed offset. In the case where there are no failed tuples, we can reach 
the limit by having maxUncommittedOffsets - 1 emitted offsets, and polling 
once, getting up to maxPollRecords more. If there are retriable tuples, pausing 
will stop us from ignoring the maxUncommittedOffsets cap for partitions with no 
retriable tuples, and the partitions with retriable tuples won't get more than 
maxPollRecords beyond the last committed offset, since the consumer seeks back 
to that offset when polling for retriable offsets.

There's a second minor issue I'd like this PR to address:
If maxPollRecords isn't exactly equal to maxUncommittedOffsets, the spout 
can behave in some undesirable ways.
* If maxPollRecords is greater than maxUncommittedOffsets, the 
maxUncommittedOffsets limit may be exceeded on any one poll. In this case 
there's no reason to have 2 separate variables, since the net effect is the 
same as setting maxUncommittedOffsets to be equal to maxPollRecords. 
* If maxPollRecords is less than maxUncommittedOffsets, there's a risk of 
the spout getting stuck on some tuples for a while when it is retrying tuples.
  Say there are 10 retriable tuples following the last committed offset, 
and maxUncommittedOffsets is 10. If maxPollRecords is 5 and the first 5 
retriable tuples are reemitted in the first batch, the next 5 tuples can't be 
emitted until (some of) the first 5 are acked. This is because the spout will 
seek the consumer back to the last committed offset any time there are failed 
tuples, which will lead to it getting the first 5 tuples out of the consumer, 
checking that they are emitted, and skipping them. This will repeat until the 
last committed offset moves. If there are other partitions with tuples 
available, those tuples may get emitted, but the "blocked" partition won't 
progress until some tuples are acked on it.
  
I think it might make sense to remove maxUncommittedOffsets entirely, and 
hav

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-27 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl yes, more or less.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-27 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
@srdo what you mean is that if `numUncommittedOffsets  > max 
UncommittedOffsets` the Spout should still emit the retries, but not poll new 
records. Is that it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-27 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl Yes, I believe so. There is still a problem where the spout will stop 
emitting tuples if more than numUncommittedOffsets tuples have failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-27 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
@srdo is this patch still relevant in face of the other patches that got 
merged in ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-16 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1924
  
@srdo Thanks for the note. I will take a look later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-16 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
@hmcl I think this is ready for another look when you get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-05 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
This also changes the exponential backoff formula from currentTime + 
delayPeriod^(failCount-1) to currentTime + delayPeriod*2^(failCount-1). 
Multiplying the delay by itself causes the delay to grow extremely quickly, and 
probably wasn't intended.

It might make sense to add jitter to the backoff as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

2017-02-05 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1924
  
I think it would be best if we merged maxPollOffsets and 
maxUncommittedOffsets, since having them be different has some undesirable side 
effects. @hmcl do you have an opinion.

Also this drops support for Kafka 0.9 in KafkaSpoutConfig, since I believe 
https://github.com/apache/storm/pull/1556 already means that this release won't 
work with 0.9.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---