[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user harshach commented on the issue: https://github.com/apache/storm/pull/1924 Thanks @srdo for your patience. Merged into 1.x & master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 Squashed and rebased --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @pavankumarp89 It should probably go in 1.2.0 or something like that. I definitely want it backported to 1.x, but I think we're trying to avoid API breakage in patch versions, and this PR changes the RetryService API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user pavankumarp89 commented on the issue: https://github.com/apache/storm/pull/1924 Can you please let us know if this fix will be applied to lower version 1.0.4 or 1.0.3 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @HeartSaVioR @hmcl Do you guys have time to take another look? (again, very sorry about the number of times I've declared this "done") --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 Very sorry about this, but this is still broken :( In the current implementation (pre this PR), the spout can be prevented from polling if there are more than maxUncommittedOffsets failed tuples. This is because the check for whether to poll isn't accounting for retriable tuples already being counted in the numUncommittedOffsets count. The current fix is to allow failed tuples to retry always, even if maxUncommittedOffsets is exceeded, because failed tuples don't contribute further to the numUncommittedOffsets count. The problem has been to ensure that we don't also emit a bunch of new tuples when polling for retriable tuples, and end up ignoring maxUncommittedOffsets entirely. This was partially fixed by pausing partitions that have no retriable tuples. With the previous seeking behavior, this was a complete fix, since maxPollRecords put a bound on how far the spout could read from the commit offset in a poll that was ignoring maxUncommittedOffsets. With the new seeking behavior this is broken. If the spout has emitted as many tuples as allowed, and the last (highest offset) tuple fails, the spout may now poll for a full batch of new tuples, starting from the failed tuple. This scenario can repeat arbitrarily many times, so maxUncommittedOffsets is completely ineffective. We don't want to go back to the old seeking behavior (IMO), because it meant that in the case where maxPollRecords is much lower than maxUncommittedOffsets (almost always), the spout might end up choking on failed tuples. For example, if maxPollRecords is 5, and tuple 0-4 are not ready for retry (they might have been retried already, and are now waiting for retry backoff), but tuple 5-9 are, the spout is unable to retry 5-9 (or anything else on that partition) because it keeps seeking back to 0, and polling out the first 5 tuples. Seeking directly to the retriable tuples should in most cases be more efficient as well, because in the old implementation we'd just be seeking to the last committed offset, polling, and discarding tuples until we reach the ones that can be retried. We could probably fix the broken behavior by trying really hard not to emit new tuples when we're ignoring maxUncommittedOffsets, but that seems like it would be error prone and complicated to implement. I think we might be able to fix this by ensuring that we don't "doublecount" retriable tuples. When the spout is deciding whether to poll, it should deduct retriable tuples from numUncommittedOffsets when comparing to maxUncommittedOffsets. Changing the poll check in this way is the same as enforcing the following constraint per partition, it seems to me: * Poll only if `numNonRetriableEmittedTuples < maxUncommittedOffsets`. If there are more nonretriable tuples than that, the poll won't be allowed because `numUncommittedOffsets = numRetriableTuples + numNonRetriableEmittedTuples`, so `numUncommittedOffsets - numRetriableTuples >= maxUncommittedOffsets`. This should mean that the limit on uncommitted tuples on each partition is going to be `maxUncommittedOffsets + maxPollRecords - 1`, because the latest tuple that can be retried on a partition is the one at offset `maxUncommittedOffsets`, where there are `maxUncommittedOffsets - 1` uncommitted tuples "to the left". If the retry poll starts at that offset, it at most emits the retried tuple plus `maxPollRecords - 1` new tuples. There shouldn't be any problems when multiple partitions have retriable tuples, where retriable tuples on one partition might be able to cause a different partition to break the uncommitted offset limit. This is because a partition will at minimum contribute 0 to numUncommittedOffsets (e.g. if all uncommitted tuples on that partition are retriable), because any retriable tuples being subtracted were already counted in numUncommittedOffsets when the tuples were originally emitted. If we can enforce the limit on a per partition basis this way, there's no reason to worry about only emitting retriable tuples when we're exceeding maxUncommittedOffsets. I don't think there's a need for pausing partitions anymore either. It was meant to prevent polling for new tuples when there were retriable tuples, but we're no longer trying to prevent that, since the per partition cap is already ensuring we won't emit too many tuples. Pausing in this case would prioritize retriable tuples over new tuples (e.g. in the case where an unpaused consumer might choose to fetch from a nonretriable partition even though there are retriable tuples), but might lead to lower throughput overall (in the case where there are not enough messages on the retriable partitions to fill a batch). I've removed it again. I've put up what I hope is the fix both here and on the 1
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1924 @srdo Thanks for your patience. +1 I can squash this and merge, but seems like #2004 depends on this so it would be clearer if you squash the commits for this PR (and for 1.x branch #2022 too), and rebase #2004 after merging this. Let me know when you finish squashing. Thanks again! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 +1. @srdo I forgot to say in the last review. Can you please squash the commits. Once that is done as I am concerned, this patch is ready to merge. @HeartSaVioR can you please do your last pass? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 Thanks for the reviews guys. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 @srdo @HeartSaVioR I will do the last pass on this first thing in the morning such that we can merge this in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @HeartSaVioR I think as long as the spout can seek to the tuples it needs to retry instead of seeking to the committed offset, all of the minor issues I could think of are resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl I think @srdo addressed your last review comments. Could you finish the review? Please let me know if there're some points we need to discuss or resolve, so that we can address them now, or file new issues and postpone to next release if possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 Sorry to keep changing this. I hope these are the last few changes. The spout should now seek to the lowest retriable offset for each partition when retrying tuples, instead of seeking back to the last committed offset. This means there's not as far as I can see any reason to worry about maxPollRecords being set too low compared to maxUncommittedOffsets, so I've reverted that part of KafkaSpoutConfig. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl Thanks for the explanation. It makes more sense now. I don't think we can meaningfully handle the case where the user has specified no cap on retries and one or more tuples keep failing. In the situation you describe, `maxUncommittedOffsets` isn't really solving the problem. If offset 3 fails and all other tuples can be acked, the spout will emit another `maxUncommittedOffsets` tuples on the same partition as 3, and will then stop emitting because it can't commit any tuples until 3 is acked. At this point, only offset 3 will get re-emitted until it hopefully succeeds at some point, so having a continually failing tuple (or just one that fails many times) will still "clog" the spout. If `maxUncommittedOffsets` is removed, you're right that the `acked` map size is technically uncapped, but given the amount of information we store, and how the spout handles retries, I think it is a non-issue. Just to illustrate, say we remove `maxUncommittedOffsets`, and let's say again that offset 3 is failing a large number of times, and the last acked offset is far beyond it, e.g. 500. When 3 is ready for retry, the consumer is reset to that position on the relevant partition. This means it fetches and emits message 3. It will also have to fetch 4...500 (potentially over many calls to `poll()`), because we never seek forward to where we left off). The spout therefore has to spend time fetching and discarding all tuples up to 500 before it can finally emit another tuple. It seems likely that it'll hit the point where it is spending all the time fetching and discarding acked tuples earlier than it'll run out of memory to store their offsets. Disregarding the issue with consumer seeking, because my reasoning relies on an implementation detail of how we do retries, I'm still not sure `maxUncommittedOffsets` is allowing higher throughput compared to not having it. If we allow the spout to fail with an `OutOfMemoryError`, the spout will have had higher throughput up to the crash than if it were being throttled by `maxUncommittedOffsets` (because otherwise it would also have had an OOME in that case). It really seems to me like all `maxUncommittedOffsets` is doing is trading having the spout potentially cause an OOME due to `acked` size, in exchange for making the spout react to the same situation by not emitting any more tuples. I'm not sure that is better, because data flow stops in either case. `maxUncommittedOffsets` could have some value in a system where the `KafkaSpout` is a secondary stream source and the messages coming out of it aren't time sensitive. In that case it might be fine to let the `KafkaSpout` stop emitting tuples for a while if some tuples temporarily can't be acked, but having it cause an OOME would be too disruptive because the primary spout could be doing fine. I can't come up with a concrete example of this kind of configuration though. I'd be fine with keeping `maxUncommittedOffsets` for that kind of situation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 @srdo I am reading through your detailed explanation again and will let you know my opinion concerning the need, or no need, for the parameter `maxUncommittedOffsets`, based on all the facts you state. Prior to that, however, I will share right away what is the rationale behind `maxUncommittedOffsets` and why I think we still need it. `maxUncommittedOffsets` is unrelated to `ConsumerConfig#MAX_POLL_RECORDS_CONFIG` and to `Config#TOPOLOGY_MAX_SPOUT_PENDING`. It's purpose is to allow the `KafkaSpout` to keep polling/fetching records from Kafka while imposing a (memory consumption) limit on how many offsets can be **polled and non committed over ANY ARBITRARY NUMBER of polls to Kafka (not just one poll)**, before the KafkaSpout stops polling. This limit is necessary because if e.g. tuple for offset **3** keeps failing, and all subsequent offsets **(4, 5, 6, ... 1M, ... 1B, ...)** are acked, the spout won't be able to commit to Kafka any offset greater than 3, unless it reaches **maxNumberOfRetrials**. If there is no limit on **maxNumberOfRetrials** (in order to guarantee at least once delivery), and offset 3 keeps failing forever, the spout won't ever be able to commit any offset > 3. In the limit this would cause the `Map acked` to grow to infinite size (despite it's small footprint, which is constituted of mainly offsets and some bookkeeping small objects - it doesn't have `ConsumerRecord`s). In order to put a cap on the size of this map, `maxUncommittedOffset` was created to stop polling if this number is too high, and avoid `OutOfMemoryError`. **The purpose of the KafkaSpout keeping on polling despite a particular offset (tuple) keeping on failing is to increase the overall throughput.** If at some point the offset 3 is successfully acked, the max subset of continuous offsets acked would be committed to Kafka in the next `Timer commitTimer` cycle, and the size of `Map acked` would be reduced (potentially by a lot). The spout could then keep doing its work and the overall throughput would be much higher. `maxUncommittedOffsets` is not a hard limit parameter, and I don't think that we have to enforce that the KafkaSpout never exceeds this number. It is OK to exceed it as long as the JVM doesn't throw `OutOfMemoryError`. Currently, and prior to these proposed changes, in the worse case **maxUncommittedOffsets** is upper bounded by **maxUncommittedOffsets + maxFetchRecords - 1**, which is OK as long as there is memory. Perhaps the name `maxUncommittedOffsets` is not the most clear name. We can either change the name (which can cause background compatibility issues and/or confusion), or document properly what this parameter really means. The bottom line is that I think that we need a parameter doing the job that `maxUncommittedOffsets` is currently doing, which I don't think can be accomplished with a combination of`ConsumerConfig#MAX_POLL_RECORDS_CONFIG` and/or `Config#TOPOLOGY_MAX_SPOUT_PENDING` The name `maxUncommittedOffsets` should more precisely be something like `allowPollingIflLessThanMaxUncommittedOffsets` Please let me know your thoughts. I will still go over your explanation again in the meantime. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl @HeartSaVioR I think if we can't think of a good use case for maxUncommittedOffsets, we'd be better off removing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 @HeartSaVioR @srdo reviewing this right now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl I guess you're busy, but could you continue reviewing this one, or please let me know if you are not available for reviewing soon? I'd like to get this merged sooner since this is necessary for 1.1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/1924 +1 Once @hmcl finished the review and no outstanding comments there I'll merge. Btw, regarding maxUncommittedOffsets, yes it provides more guarantees but if we guarantee that commit occurs in time periodically (we're providing option too), IMHO that's enough. The thing I have in mind is that commit in nextTuple() doesn't strictly guarantee commit in time. Two situations come in mind: reached max spout pending, backpressure in place. If spout reached max spout pending, since we are emitting **at most a tuple** for nextTuple() so when tuple tree for a tuple is complete or tuple fails, nextTuple() can be called. At least nextTuple() will be called after tuple timeout (actually up to 2 * tuple timeout due to underlying implementation). So if users can tolerate this, it might be OK, but could be far from configured value. If backpressure is in place, nextTuple() will never be called. We can't guarantee any time frame here. If we want to strictly guarantee that commit occurs in time, it should be apart from spout's lifecycle, like timer. It might also introduce some handling of thread-safety so a bit more complicated, but IMO simpler than respecting maxUncommittedOffsets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @HeartSaVioR I agree, I think it would be much simpler to rely on maxSpoutPending. maxUncommittedOffsets does have a slightly different meaning though. With only maxSpoutPending, the spout might progress arbitrarily far past the last committed offset, since there's only a limit on how many tuples are in-flight at once, rather than on how many tuples have been emitted past the committed offset. If a user needs to cap how far the spout can read ahead for some reason, they can't do that with maxSpoutPending. I can't tell if that's a real need anyone has though. @hmcl any opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 @srdo apologies for a little delay. Had a lot of deadlines for last week. I will finish reviewing this today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl I made the changes mentioned in my last comment. As far as I can tell there's no way to get the default max.poll.records out of Kafka, so for now I've made do with adding comments in KafkaSpoutConfig recommending that maxUncommittedOffsets and maxPollRecords should be equal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 I think it would be good to get some of the assumptions I've made down in writing, so I just wrote down a more thorough explanation of the issues I think this PR should fix, and what changes I think it should make: The intention of this PR is to fix an issue where the spout has emitted maxUncommittedOffsets (or more) tuples, and some of them fail, and the spout then hangs because the `numUncommittedOffsets < maxUncommittedOffsets` check when deciding whether to poll doesn't account for retriable tuples. Blindly polling when there are retriable tuples will fix this issue, but leads to us not being able to bound the number of offsets the consumer has read past the last committed offset. For an example of a case where we can't put a bound on uncommitted offsets, say that partition 0 has maxUncommittedOffsets + maxPollRecords emitted tuples past the commit offset, and partition 1 has no emitted tuples (or any number below maxUncommittedOffsets, it doesn't matter). When tuples fail and become retriable on partition 0, the spout would blindly poll for more tuples. If it gets tuples from partition 0 it's okay, since it seeks back to the committed offset for that partition, so it won't go beyond committedOffset + maxPollRecords (since the retry logic seeks back to the committed offset on every poll, the spout can't go more than maxPollRecords beyond that offset). If it gets tuples from partition 1 we're effectively just emitting new tuples while ignoring maxUncommittedOffsets. Since we aren't controlling which partitions the polled tuples may come from, we can't say anything meaningful about a cap on uncommitted offsets. While I think it would probably work out to being capped anyway, due to the consumer consuming partitions round robin (AFAIK), I'd prefer if the spout implementation doesn't make assumptions beyond those guaranteed by the consumer API (in which case we should assume any call to `KafkaConsumer.poll` could return messages for an y assigned non-paused partition). So the way to fix that issue is to ensure that if we're polling for retriable tuples, we only poll on those partitions that have retriable tuples (i.e. we pause the others when doing `doSeekRetriableTopicPartitions`, then resume all assigned after calling `KafkaConsumer.poll`). Pausing the other partitions should only affect that poll cycle, since once retriable tuples get emitted they're no longer retriable, and we won't hit `doSeekRetriableTopicPartitions` for those tuples again right away. In most polls (no failed tuples, or tuples are failed but not ready for retry), we won't hit the pausing case. If we pause partitions with no retriable tuples when polling on partitions with retriable tuples, we should be able to guarantee that any partition never gets more than maxUncommittedOffsets + maxPollRecords - 1 past the last committed offset. In the case where there are no failed tuples, we can reach the limit by having maxUncommittedOffsets - 1 emitted offsets, and polling once, getting up to maxPollRecords more. If there are retriable tuples, pausing will stop us from ignoring the maxUncommittedOffsets cap for partitions with no retriable tuples, and the partitions with retriable tuples won't get more than maxPollRecords beyond the last committed offset, since the consumer seeks back to that offset when polling for retriable offsets. There's a second minor issue I'd like this PR to address: If maxPollRecords isn't exactly equal to maxUncommittedOffsets, the spout can behave in some undesirable ways. * If maxPollRecords is greater than maxUncommittedOffsets, the maxUncommittedOffsets limit may be exceeded on any one poll. In this case there's no reason to have 2 separate variables, since the net effect is the same as setting maxUncommittedOffsets to be equal to maxPollRecords. * If maxPollRecords is less than maxUncommittedOffsets, there's a risk of the spout getting stuck on some tuples for a while when it is retrying tuples. Say there are 10 retriable tuples following the last committed offset, and maxUncommittedOffsets is 10. If maxPollRecords is 5 and the first 5 retriable tuples are reemitted in the first batch, the next 5 tuples can't be emitted until (some of) the first 5 are acked. This is because the spout will seek the consumer back to the last committed offset any time there are failed tuples, which will lead to it getting the first 5 tuples out of the consumer, checking that they are emitted, and skipping them. This will repeat until the last committed offset moves. If there are other partitions with tuples available, those tuples may get emitted, but the "blocked" partition won't progress until some tuples are acked on it. I think it might make sense to remove maxUncommittedOffsets entirely, and hav
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl yes, more or less. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 @srdo what you mean is that if `numUncommittedOffsets > max UncommittedOffsets` the Spout should still emit the retries, but not poll new records. Is that it ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl Yes, I believe so. There is still a problem where the spout will stop emitting tuples if more than numUncommittedOffsets tuples have failed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 @srdo is this patch still relevant in face of the other patches that got merged in ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user hmcl commented on the issue: https://github.com/apache/storm/pull/1924 @srdo Thanks for the note. I will take a look later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl I think this is ready for another look when you get a chance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 This also changes the exponential backoff formula from currentTime + delayPeriod^(failCount-1) to currentTime + delayPeriod*2^(failCount-1). Multiplying the delay by itself causes the delay to grow extremely quickly, and probably wasn't intended. It might make sense to add jitter to the backoff as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 I think it would be best if we merged maxPollOffsets and maxUncommittedOffsets, since having them be different has some undesirable side effects. @hmcl do you have an opinion. Also this drops support for Kafka 0.9 in KafkaSpoutConfig, since I believe https://github.com/apache/storm/pull/1556 already means that this release won't work with 0.9. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---