[jira] [Created] (STORM-644) KafkaUtils repeat fetch messages which offset is out of range
Xin Wang created STORM-644: -- Summary: KafkaUtils repeat fetch messages which offset is out of range Key: STORM-644 URL: https://issues.apache.org/jira/browse/STORM-644 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (STORM-644) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Wang closed STORM-644. -- Resolution: Fixed KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-644 URL: https://issues.apache.org/jira/browse/STORM-644 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (STORM-646) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Wang closed STORM-646. -- Resolution: Fixed KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-646 URL: https://issues.apache.org/jira/browse/STORM-646 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (STORM-647) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Wang closed STORM-647. -- Resolution: Fixed KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-647 URL: https://issues.apache.org/jira/browse/STORM-647 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Wang updated STORM-643: --- Priority: Critical (was: Major) KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-643 URL: https://issues.apache.org/jira/browse/STORM-643 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang Priority: Critical KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [worker-log] 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 ... [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... also: Log retrying with default start offset time from configuration. configured start offset time: [-2] is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (STORM-647) KafkaUtils repeat fetch messages which offset is out of range
Xin Wang created STORM-647: -- Summary: KafkaUtils repeat fetch messages which offset is out of range Key: STORM-647 URL: https://issues.apache.org/jira/browse/STORM-647 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Wang updated STORM-643: --- Description: KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [worker-log] 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... also: Log retrying with default start offset time from configuration. configured start offset time: [-2] is incorrect. was: KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-643 URL: https://issues.apache.org/jira/browse/STORM-643 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [worker-log] 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... also: Log retrying with default start offset time from configuration. configured start offset time: [-2] is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (STORM-645) KafkaUtils repeat fetch messages which offset is out of range
Xin Wang created STORM-645: -- Summary: KafkaUtils repeat fetch messages which offset is out of range Key: STORM-645 URL: https://issues.apache.org/jira/browse/STORM-645 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: [STORM-639] storm-maven-plugin not found
Github user Lewuathe commented on the pull request: https://github.com/apache/storm/pull/403#issuecomment-72356826 It seems that there is no way to install `storm-maven-plugin` in local repository before compiling `storm-core` because with `compile` phase there is no jar files enough to install. So I think there are two options. 1. Update documents to install `storm-maven-plugin` before compiling `storm-core` project. 2. Deploy `storm-maven-plugin` on remote repository. What do you think? If someone has any other ideas, please let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Wang updated STORM-643: --- Description: KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [worker-log] 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 ... [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... also: Log retrying with default start offset time from configuration. configured start offset time: [-2] is incorrect. was: KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [worker-log] 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... also: Log retrying with default start offset time from configuration. configured start offset time: [-2] is incorrect. KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-643 URL: https://issues.apache.org/jira/browse/STORM-643 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [worker-log] 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 ... [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... also: Log
[jira] [Closed] (STORM-645) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Wang closed STORM-645. -- Resolution: Fixed KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-645 URL: https://issues.apache.org/jira/browse/STORM-645 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-637) Integrate PartialKeyGrouping into storm API
[ https://issues.apache.org/jira/browse/STORM-637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14300268#comment-14300268 ] ASF GitHub Bot commented on STORM-637: -- Github user gdfm commented on the pull request: https://github.com/apache/storm/pull/404#issuecomment-72374010 Looks pretty neat! +1 Integrate PartialKeyGrouping into storm API --- Key: STORM-637 URL: https://issues.apache.org/jira/browse/STORM-637 Project: Apache Storm Issue Type: Improvement Reporter: Robert Joseph Evans STORM-632 adds support for PartialKey Grouping. It would be really nice to integrate this fully with the storm APIs. So this would mean Update PartialKeysGrouping to optionally take a Fields parameter BoltDeclarer to have a partialKeysGrouping API TridentTopologyBuilder to have a partialKeysGrouping API and documentation on how to use a partialKeysGrouping, including some examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-637: Integrate PartialKeyGrouping into s...
Github user gdfm commented on the pull request: https://github.com/apache/storm/pull/404#issuecomment-72374010 Looks pretty neat! +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-162) Load Balancing Shuffle Grouping
[ https://issues.apache.org/jira/browse/STORM-162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14300280#comment-14300280 ] Gianmarco De Francisci Morales commented on STORM-162: -- Thanks [~revans2], looks like an interesting problem. I think we are speaking about two different problems here. One, is the classic balls-and-bins problem, which is caused by the single-choice paradigm and the use of a hash function. As far as I understand, this has nothing to do with shuffle grouping which uses round-robin message distribution. The guarantee is that downstream bolts will always get the same amount of messages. Now, the real problem seems to be the fact that this guarantee is not enough in heterogeneous dynamic environments. That is, we would like to modulate the number of messages sent downstream proportionally to the speed of the bolt receiving them. A relevant piece of literature is Flux (http://db.cs.berkeley.edu/papers/icde03-fluxlb.pdf). While they have a different concept of repartitioning, they use a central controller, and it's mainly intended for stateful operator and key grouping (fields grouping in Storm parlance), I think some ideas from the paper could be adapted. This issue is definitely related to some research problems that we have been looking into. Although we do not have a solution at the moment, we will definitely keep investigating. My intuition is that a fully decentralized solution that does not require to send back load information should be possible. Coming now to the patch, the {{localBoltLoad}} definition makes sense to me. I am not sure about the {{messagingClientLoad}}, as I am not too familiar with Storm's internals. Which messages is it counting? In general, I think a mixture of network and CPU load might be a better measure (e.g., messages + idle time). For short term spikes, I have a question. Is there any back-pressure mechanism in the current messaging code? I.e., does the messaging follow a pull model from the downstream operator? If so, the short-term buffer Flux proposes might be a good solution. Load Balancing Shuffle Grouping --- Key: STORM-162 URL: https://issues.apache.org/jira/browse/STORM-162 Project: Apache Storm Issue Type: Wish Reporter: James Xu Assignee: Robert Joseph Evans Priority: Minor https://github.com/nathanmarz/storm/issues/571 Hey @nathanmarz, I think that the current shuffle grouping is creating very obvious hot-spots in load on hosts here at Twitter. The reason is that randomized message distribution to the workers is susceptible to the balls and bins problem: http://pages.cs.wisc.edu/~shuchi/courses/787-F07/scribe-notes/lecture07.pdf the odds that some particular queue gets bogged down when you're assigning tasks randomly is high. You can solve this problem with a load-aware shuffle grouping -- when shuffling, prefer tasks with lower load. What would it take to implement this feature? -- sritchie: Looks like Rap Genius was heavily affected when Heroku started running a shuffle grouping on tasks to its dynos: http://rapgenius.com/James-somers-herokus-ugly-secret-lyrics 50x performance degradation over a more intelligent load-balancing scheme that only sent tasks to non-busy dynos. Seems very relevant to Storm. -- nathanmarz: It's doing randomized round robin, not fully random distribution. So every downstream task gets the same number of messages. But yes, I agree that this would be a great feature. Basically what this requires is making stats of downstream tasks available to the stream grouping code. The best way to implement this would be: Implement a broadcast message type in the networking code, so that one can efficiently send a large object to all tasks in a worker (rather than having to send N copies of that large message) Have a single executor in every topology that polls nimbus for accumulated stats once per minute and then broadcasts that information to all tasks in all workers Wire up the task code to pass that information along from the task to the outgoing stream groupings for that task (and adding appropriate methods to the CustomStreamGrouping interface to receive the stats info) -- sorenmacbeth: @nathanmarz @sritchie Did any progress ever get made on this? Is the description above still relevant to Storm 0.9.0. We are getting bitten by this problem and would love to see something like this implemented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
questions on task, threads and workers
Hi, I have a few simple questions. 1)In storm .9.x, what is the default value for the bolt num tasks? According to the docs, the parallelism hint no longer sets the number of tasks, but the number of executor threads. 2)What happens if the number of tasks is less than the number of threads? Should I assume this results in idle threads? 3)Does the number of workers multiplies the number of tasks and threads? feedback appreciated, Clay
[GitHub] storm pull request: Update PartitionManager.java
GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/405 Update PartitionManager.java fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out of range You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm 0.9.3-branch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #405 commit 9280a948efaf28ab4de019060435d46731abd375 Author: vesense best.wang...@163.com Date: 2015-02-02T06:24:14Z Update PartitionManager.java fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out of range --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-643) KafkaUtils repeat fetch messages which offset is out of range
[ https://issues.apache.org/jira/browse/STORM-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14300943#comment-14300943 ] ASF GitHub Bot commented on STORM-643: -- GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/405 Update PartitionManager.java fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out of range You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm 0.9.3-branch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #405 commit 9280a948efaf28ab4de019060435d46731abd375 Author: vesense best.wang...@163.com Date: 2015-02-02T06:24:14Z Update PartitionManager.java fix bug [STORM-643] KafkaUtils repeat fetch messages which offset is out of range KafkaUtils repeat fetch messages which offset is out of range - Key: STORM-643 URL: https://issues.apache.org/jira/browse/STORM-643 Project: Apache Storm Issue Type: Bug Components: storm-kafka Affects Versions: 0.9.2-incubating, 0.9.3 Reporter: Xin Wang Assignee: Xin Wang Priority: Critical KafkaUtils repeat fetch messages which offset is out of range. This happened when failed list(SortedSetLong failed) is not empty and some offset in it is OutOfRange. [worker-log] 2015-02-01 10:24:27.231+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.232+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 2015-02-01 10:24:27.333+0800 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [20919071816]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-02-01 10:24:27.334+0800 s.k.PartitionManager [WARN] Using new offset: 20996130717 ... [FIX] storm.kafka.PartitionManager.fill(): ... try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn(Using new offset: {}, _emittedToOffset); // fetch failed, so don't update the metrics //fix bug: remove this offset from failed list when it is OutOfRange if (had_failed) { failed.remove(offset); } return; } ... also: Log retrying with default start offset time from configuration. configured start offset time: [-2] is incorrect. -- This message was sent by Atlassian JIRA (v6.3.4#6332)