[jira] [Resolved] (STORM-2505) Kafka Spout doesn't support voids in the topic (topic compaction not supported)
[ https://issues.apache.org/jira/browse/STORM-2505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vivek Mittal resolved STORM-2505. - Resolution: Fixed Fix Version/s: 1.x > Kafka Spout doesn't support voids in the topic (topic compaction not > supported) > --- > > Key: STORM-2505 > URL: https://issues.apache.org/jira/browse/STORM-2505 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 1.x >Reporter: Vivek Mittal > Fix For: 1.x > > Time Spent: 4h 50m > Remaining Estimate: 0h > > Kafka maintains the spout progress (offsets for partitions) which can hold a > value which no longer exists (or offset+1 doesn't exist) in the topic due to > following reasons > * Topology stopped processing (or died) & topic got compacted > (cleanup.policy=compact) leaving offset voids in the topic. > * Topology stopped processing (or died) & Topic got cleaned up > (cleanup.policy=delete) and the offset. > When the topology starts processing again (or restarted), the spout logic > suggests that the next offset has to be (committedOffset+1) for the spout to > make progress, which will never be the case as (committedOffset+1) has been > removed from the topic and will never be acked. > {code:title=OffsetManager.java|borderStyle=solid} > if (currOffset == nextCommitOffset + 1) {// found the next > offset to commit > found = true; > nextCommitMsg = currAckedMsg; > nextCommitOffset = currOffset; > } else if (currOffset > nextCommitOffset + 1) { > LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will > be processed in a subsequent batch.", tp, currOffset); > } > {code} > A smart forwarding mechanism has to be built so as to forward the spout pivot > to the next logical location, instead of a hardcoded single forward operation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (STORM-2540) Get rid of window compaction in WindowManager
Stig Rohde Døssing created STORM-2540: - Summary: Get rid of window compaction in WindowManager Key: STORM-2540 URL: https://issues.apache.org/jira/browse/STORM-2540 Project: Apache Storm Issue Type: Improvement Components: storm-client Affects Versions: 2.0.0 Reporter: Stig Rohde Døssing Assignee: Stig Rohde Døssing Storm's windowing support uses trigger and eviction policies to control the size of the windows passed to WindowingBolts. The WindowManager has a hard coded limit of 100 tuples before tuples will start getting evicted from the window, probably as an attempt to avoid overly huge windows when using time based eviction policies. Whenever a tuple is added to the window, the hard cap is checked, and if the number of tuples in the window exceeds the cap the WindowManager evaluates the EvictionPolicy for the tuples to figure out if some can be removed. This hard cap is ineffective in most configurations, and has a surprising interaction with the count based policy. If the windowing bolt is configured to use timestamp fields in the tuples to determine the current time, the WatermarkingXPolicy classes are used. In this configuration, the compaction isn't doing anything because tuples cannot be evicted until the WatermarkGenerator sends a new watermark, and when it does the TriggerPolicy causes the WindowManager to evict any expired tuples anyway. If the windowing bolt is using the count based policy, compaction has the unexpected effect of hard capping the user's configured max count to 100. If the configured count is less than 100, the compaction again has no effect. When the bolt is configured to use the tuple arrival time based policy, the compaction only has an effect if there are tuples older than the configured window duration, which only happens if the window happens to trigger slightly late. This can cause tuples to be evicted from the window before the user's bolt sees them. Even when tuples are evicted with the compaction mechanism they are kept in memory until the next time a window is presented to the user's bolt. I think the compaction mechanism should be removed. The only policy that benefits is the time based policy, and in that case it would be better to just add a configurable max tuple count to that policy. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (STORM-2514) Incorrect logs for mapping between Kafka partitions and task IDs
[ https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hugo Louro reassigned STORM-2514: - Assignee: Hugo Louro > Incorrect logs for mapping between Kafka partitions and task IDs > > > Key: STORM-2514 > URL: https://issues.apache.org/jira/browse/STORM-2514 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Reporter: Srishty Agrawal >Assignee: Hugo Louro > Attachments: NewClass.java, worker.log > > > While working on > [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker > logs were generated with debug mode on. The information printed about mapping > between Task IDs and kafka partitions was contradictory to my assumptions. I > ran a topology which used KafkaSpout from the storm-kafka-client module, it > had a parallelism hint of 2 (number of executors) and a total of 16 tasks. > The log lines mentioned below show assigned mapping between executors and > kafka partitions: > {noformat} > o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] > Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions > reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, > topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]] > o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] > Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] > for group kafkaSpoutTestGroup > o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions > reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, > consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, > topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]] > {noformat} > It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1 > (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks > in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. > These log lines are being printed by Tasks with IDs 10 and 15 in respective > executors. > Logs which emit individual messages do not abide by the above assumption. For > example in the log mentioned below, Task ID 3 (added code, as a part of > debugging STORM-2506, to print the Task ID right next to component ID) which > runs on Executor1 reads from partition 2 (the second value inside the square > brackets), instead of 4, 5, 6 or 7. > {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 > default [8topic, 2, 0, null, 1]{noformat} > This behavior has been summarized in the table below : > {noformat} > Task IDs --- 3, 4, 7, 8, 9, 11, 15, 18 Partitions 0, 1, 2, 3 > Task IDs --- 5, 6, 10, 12, 13, 14, 16, 17 - Partition 4, 5, 6, 7 > {noformat} > [You can find the relevant parts of log file > here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351] > > Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, > 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to > executor2? Are (3 10) not the starting and ending task IDs in Executor1? > Another interesting thing to note is that, Task IDs 10 and 15 are always > reading from the partitions they claimed to be reading from (while setting > partition assignments). > If my assumptions are correct, there is a bug in the way the mapping > information is being/passed to worker logs. If not, we need to make changes > in our docs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality
[ https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siwoon Son updated STORM-2539: -- Description: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays. |!sender-imbalance.png|width=400!|!receiver-imbalance.png|width=400!| |(a) Example of sender imbalance problem.|(b) Example of receiver imbalance problem.| So, I propose locality aware grouping which can solve the load balancing problem while considering the locality. Locality aware grouping is a method of periodically calculating the ping response time between nodes and transmitting more tuples probabilistically to nodes with low response time. I implemented the proposed Locality aware grouping at [https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. LocalityAwareGrouping.java is a class that implements locality aware grouping. LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and PerformanceLoggingBolt.java are topology classes for testing this. LocalityAwareGrouping$prepare() method reads the network information of each node from the Zookeeper and activates the thread. This thread periodically calculates the ping response time of each node. LocalityAwareGrouping$chooseTasks() method selects a task by a higher probability for the nodes with lower network response times. But the implementation is straightforward. To calculate the ping between nodes, the network information of the nodes that tasks are performing is needed. I got this information from Zookeeper using the Zookeeper$getData() method. At this time, in order to create a Zookeeper object, I had no choice but to receive the connection information of the Zookeeper from the user. Please let me know, if there is a way to get the network information of each node without requiring additional parameters from the user and if you have any additional comments. Thank you for reading. was: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays.
[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality
[ https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siwoon Son updated STORM-2539: -- Description: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays. |!sender-imbalance.png|width=250!|!receiver-imbalance.png|width=250!| |(a) Example of sender imbalance problem.|(b) Example of receiver imbalance problem.| So, I propose locality aware grouping which can solve the load balancing problem while considering the locality. Locality aware grouping is a method of periodically calculating the ping response time between nodes and transmitting more tuples probabilistically to nodes with low response time. I implemented the proposed Locality aware grouping at [https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. LocalityAwareGrouping.java is a class that implements locality aware grouping. LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and PerformanceLoggingBolt.java are topology classes for testing this. LocalityAwareGrouping$prepare() method reads the network information of each node from the Zookeeper and activates the thread. This thread periodically calculates the ping response time of each node. LocalityAwareGrouping$chooseTasks() method selects a task by a higher probability for the nodes with lower network response times. But the implementation is straightforward. To calculate the ping between nodes, the network information of the nodes that tasks are performing is needed. I got this information from Zookeeper using the Zookeeper$getData() method. At this time, in order to create a Zookeeper object, I had no choice but to receive the connection information of the Zookeeper from the user. Please let me know, if there is a way to get the network information of each node without requiring additional parameters from the user and if you have any additional comments. Thank you for reading. was: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays.
[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality
[ https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siwoon Son updated STORM-2539: -- Description: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays. |!sender-imbalance.png|height=250,width=250!|!receiver-imbalance.png|height=250,width=250!| |(a) Example of sender imbalance problem.|(b) Example of receiver imbalance problem.| So, I propose locality aware grouping which can solve the load balancing problem while considering the locality. Locality aware grouping is a method of periodically calculating the ping response time between nodes and transmitting more tuples probabilistically to nodes with low response time. I implemented the proposed Locality aware grouping at [https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. LocalityAwareGrouping.java is a class that implements locality aware grouping. LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and PerformanceLoggingBolt.java are topology classes for testing this. LocalityAwareGrouping$prepare() method reads the network information of each node from the Zookeeper and activates the thread. This thread periodically calculates the ping response time of each node. LocalityAwareGrouping$chooseTasks() method selects a task by a higher probability for the nodes with lower network response times. But the implementation is straightforward. To calculate the ping between nodes, the network information of the nodes that tasks are performing is needed. I got this information from Zookeeper using the Zookeeper$getData() method. At this time, in order to create a Zookeeper object, I had no choice but to receive the connection information of the Zookeeper from the user. Please let me know, if there is a way to get the network information of each node without requiring additional parameters from the user and if you have any additional comments. Thank you for reading. was: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays. !sender-imbalance.png|height=250,width=250!
[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality
[ https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siwoon Son updated STORM-2539: -- Description: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays. !sender-imbalance.png|height=250,width=250! (a) Example of sender imbalance problem. !receiver-imbalance.png|height=250,width=250! (b) Example of receiver imbalance problem. So, I propose locality aware grouping which can solve the load balancing problem while considering the locality. Locality aware grouping is a method of periodically calculating the ping response time between nodes and transmitting more tuples probabilistically to nodes with low response time. I implemented the proposed Locality aware grouping at [https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. LocalityAwareGrouping.java is a class that implements locality aware grouping. LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and PerformanceLoggingBolt.java are topology classes for testing this. LocalityAwareGrouping$prepare() method reads the network information of each node from the Zookeeper and activates the thread. This thread periodically calculates the ping response time of each node. LocalityAwareGrouping$chooseTasks() method selects a task by a higher probability for the nodes with lower network response times. But the implementation is straightforward. To calculate the ping between nodes, the network information of the nodes that tasks are performing is needed. I got this information from Zookeeper using the Zookeeper$getData() method. At this time, in order to create a Zookeeper object, I had no choice but to receive the connection information of the Zookeeper from the user. Please let me know, if there is a way to get the network information of each node without requiring additional parameters from the user and if you have any additional comments. Thank you for reading. was: I’d like to propose a new grouping method. This method solves problems that can occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method. I was motivated by the following a question. bq. "Would not it be more efficient to transfer tuples to nearby nodes?" Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly selecting a task of the next node. In this method, all nodes can receive data evenly, but the same amount of data is transferred to a relatively distant node, which can cause a high delay. To solve this problem, Storm provides Local-or-shuffle grouping considering locality. _Local-or-shuffle grouping_ can minimize the delay by internally processing data in the process if the task receiving the data is in the same process. Local-or-shuffle grouping, however, considers *only locality*, which may leads to two load balancing problems: +the sender imbalance+ and +the receiver imbalance+. First, the sender imbalance problem is a load balancing problem in which traffic is concentrated on a specific task because the number of senders’ tasks and the number of nodes are not equal. Next, the receiver imbalance problem is a load balancing problem in which traffic is concentrated on a specific object because the number of receivers' tasks and the number of nodes are not equal. If these problems occur, the tasks of receivers will perform different amounts of work, resulting in performance degradation and processing delays. !sender-imbalance.png! (a) Example of sender