[jira] [Resolved] (STORM-2505) Kafka Spout doesn't support voids in the topic (topic compaction not supported)

2017-05-31 Thread Vivek Mittal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vivek Mittal resolved STORM-2505.
-
   Resolution: Fixed
Fix Version/s: 1.x

> Kafka Spout doesn't support voids in the topic (topic compaction not 
> supported)
> ---
>
> Key: STORM-2505
> URL: https://issues.apache.org/jira/browse/STORM-2505
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.x
>Reporter: Vivek Mittal
> Fix For: 1.x
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Kafka maintains the spout progress (offsets for partitions) which can hold a 
> value which no longer exists (or offset+1 doesn't exist) in the topic due to 
> following reasons
> * Topology stopped processing (or died) & topic got compacted 
> (cleanup.policy=compact) leaving offset voids in the topic.
> * Topology stopped processing (or died) & Topic got cleaned up 
> (cleanup.policy=delete) and the offset.
> When the topology starts processing again (or restarted), the spout logic 
> suggests that the next offset has to be (committedOffset+1) for the spout to 
> make progress, which will never be the case as (committedOffset+1) has been 
> removed from the topic and will never be acked.
> {code:title=OffsetManager.java|borderStyle=solid}
>  if (currOffset == nextCommitOffset + 1) {// found the next 
> offset to commit
>   found = true;
>   nextCommitMsg = currAckedMsg;
>   nextCommitOffset = currOffset;
> } else if (currOffset > nextCommitOffset + 1) {
>   LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will 
> be processed in a subsequent batch.", tp, currOffset);
> }
> {code}
> A smart forwarding mechanism has to be built so as to forward the spout pivot 
> to the next logical location, instead of a hardcoded single forward operation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (STORM-2540) Get rid of window compaction in WindowManager

2017-05-31 Thread JIRA
Stig Rohde Døssing created STORM-2540:
-

 Summary: Get rid of window compaction in WindowManager
 Key: STORM-2540
 URL: https://issues.apache.org/jira/browse/STORM-2540
 Project: Apache Storm
  Issue Type: Improvement
  Components: storm-client
Affects Versions: 2.0.0
Reporter: Stig Rohde Døssing
Assignee: Stig Rohde Døssing


Storm's windowing support uses trigger and eviction policies to control the 
size of the windows passed to WindowingBolts. The WindowManager has a hard 
coded limit of 100 tuples before tuples will start getting evicted from the 
window, probably as an attempt to avoid overly huge windows when using time 
based eviction policies. Whenever a tuple is added to the window, the hard cap 
is checked, and if the number of tuples in the window exceeds the cap the 
WindowManager evaluates the EvictionPolicy for the tuples to figure out if some 
can be removed.

This hard cap is ineffective in most configurations, and has a surprising 
interaction with the count based policy.

If the windowing bolt is configured to use timestamp fields in the tuples to 
determine the current time, the WatermarkingXPolicy classes are used. In this 
configuration, the compaction isn't doing anything because tuples cannot be 
evicted until the WatermarkGenerator sends a new watermark, and when it does 
the TriggerPolicy causes the WindowManager to evict any expired tuples anyway.

If the windowing bolt is using the count based policy, compaction has the 
unexpected effect of hard capping the user's configured max count to 100. If 
the configured count is less than 100, the compaction again has no effect.

When the bolt is configured to use the tuple arrival time based policy, the 
compaction only has an effect if there are tuples older than the configured 
window duration, which only happens if the window happens to trigger slightly 
late. This can cause tuples to be evicted from the window before the user's 
bolt sees them. Even when tuples are evicted with the compaction mechanism they 
are kept in memory until the next time a window is presented to the user's bolt.

I think the compaction mechanism should be removed. The only policy that 
benefits is the time based policy, and in that case it would be better to just 
add a configurable max tuple count to that policy. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (STORM-2514) Incorrect logs for mapping between Kafka partitions and task IDs

2017-05-31 Thread Hugo Louro (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hugo Louro reassigned STORM-2514:
-

Assignee: Hugo Louro

> Incorrect logs for mapping between Kafka partitions and task IDs
> 
>
> Key: STORM-2514
> URL: https://issues.apache.org/jira/browse/STORM-2514
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Reporter: Srishty Agrawal
>Assignee: Hugo Louro
> Attachments: NewClass.java, worker.log
>
>
> While working on 
> [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker 
> logs were generated with debug mode on. The information printed about mapping 
> between Task IDs and kafka partitions was contradictory to my assumptions. I 
> ran a topology which used KafkaSpout from the storm-kafka-client module, it 
> had a parallelism hint of 2 (number of executors) and a total of 16 tasks. 
> The log lines mentioned below show assigned mapping between executors and 
> kafka partitions:
> {noformat}
> o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO] 
> Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] 
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions 
> reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce, 
> topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]]
> o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO] 
> Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] 
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions 
> reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup, 
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126, 
> topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]]
> {noformat}
> It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1 
> (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks 
> in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3. 
> These log lines are being printed by Tasks with IDs 10 and 15 in respective 
> executors. 
> Logs which emit individual messages do not abide by the above assumption. For 
> example in the log mentioned below, Task ID 3 (added code, as a part of 
> debugging STORM-2506, to print the Task ID right next to component ID) which 
> runs on Executor1 reads from partition 2 (the second value inside the square 
> brackets), instead of 4, 5, 6 or 7. 
> {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3 
> default [8topic, 2, 0, null, 1]{noformat}
> This behavior has been summarized in the table below : 
> {noformat}
> Task IDs --- 3, 4, 7, 8, 9, 11, 15, 18  Partitions 0, 1, 2, 3
> Task IDs --- 5, 6, 10, 12, 13, 14, 16, 17 - Partition 4, 5, 6, 7
> {noformat}
> [You can find the relevant parts of log file 
> here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351]
>  
> Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, 
> 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to 
> executor2? Are (3 10) not the starting and ending task IDs in Executor1? 
> Another interesting thing to note is that, Task IDs 10 and 15 are always 
> reading from the partitions they claimed to be reading from (while setting 
> partition assignments). 
> If my assumptions are correct, there is a bug in the way the mapping 
> information is being/passed to worker logs. If not, we need to make changes 
> in our docs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality

2017-05-31 Thread Siwoon Son (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siwoon Son updated STORM-2539:
--
Description: 
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.
|!sender-imbalance.png|width=400!|!receiver-imbalance.png|width=400!|
|(a) Example of sender imbalance problem.|(b) Example of receiver imbalance 
problem.|
So, I propose locality aware grouping which can solve the load balancing 
problem while considering the locality. Locality aware grouping is a method of 
periodically calculating the ping response time between nodes and transmitting 
more tuples probabilistically to nodes with low response time. I implemented 
the proposed Locality aware grouping at 
[https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. 
LocalityAwareGrouping.java is a class that implements locality aware grouping. 
LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and 
PerformanceLoggingBolt.java are topology classes for testing this. 
LocalityAwareGrouping$prepare() method reads the network information of each 
node from the Zookeeper and activates the thread. This thread periodically 
calculates the ping response time of each node. 
LocalityAwareGrouping$chooseTasks() method selects a task by a higher 
probability for the nodes with lower network response times.
But the implementation is straightforward. To calculate the ping between nodes, 
the network information of the nodes that tasks are performing is needed. I got 
this information from Zookeeper using the Zookeeper$getData() method. At this 
time, in order to create a Zookeeper object, I had no choice but to receive the 
connection information of the Zookeeper from the user.
Please let me know, if there is a way to get the network information of each 
node without requiring additional parameters from the user and if you have any 
additional comments.
Thank you for reading.


  was:
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.

[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality

2017-05-31 Thread Siwoon Son (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siwoon Son updated STORM-2539:
--
Description: 
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.
|!sender-imbalance.png|width=250!|!receiver-imbalance.png|width=250!|
|(a) Example of sender imbalance problem.|(b) Example of receiver imbalance 
problem.|
So, I propose locality aware grouping which can solve the load balancing 
problem while considering the locality. Locality aware grouping is a method of 
periodically calculating the ping response time between nodes and transmitting 
more tuples probabilistically to nodes with low response time. I implemented 
the proposed Locality aware grouping at 
[https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. 
LocalityAwareGrouping.java is a class that implements locality aware grouping. 
LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and 
PerformanceLoggingBolt.java are topology classes for testing this. 
LocalityAwareGrouping$prepare() method reads the network information of each 
node from the Zookeeper and activates the thread. This thread periodically 
calculates the ping response time of each node. 
LocalityAwareGrouping$chooseTasks() method selects a task by a higher 
probability for the nodes with lower network response times.
But the implementation is straightforward. To calculate the ping between nodes, 
the network information of the nodes that tasks are performing is needed. I got 
this information from Zookeeper using the Zookeeper$getData() method. At this 
time, in order to create a Zookeeper object, I had no choice but to receive the 
connection information of the Zookeeper from the user.
Please let me know, if there is a way to get the network information of each 
node without requiring additional parameters from the user and if you have any 
additional comments.
Thank you for reading.


  was:
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.

[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality

2017-05-31 Thread Siwoon Son (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siwoon Son updated STORM-2539:
--
Description: 
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.
|!sender-imbalance.png|height=250,width=250!|!receiver-imbalance.png|height=250,width=250!|
|(a) Example of sender imbalance problem.|(b) Example of receiver imbalance 
problem.|
So, I propose locality aware grouping which can solve the load balancing 
problem while considering the locality. Locality aware grouping is a method of 
periodically calculating the ping response time between nodes and transmitting 
more tuples probabilistically to nodes with low response time. I implemented 
the proposed Locality aware grouping at 
[https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. 
LocalityAwareGrouping.java is a class that implements locality aware grouping. 
LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and 
PerformanceLoggingBolt.java are topology classes for testing this. 
LocalityAwareGrouping$prepare() method reads the network information of each 
node from the Zookeeper and activates the thread. This thread periodically 
calculates the ping response time of each node. 
LocalityAwareGrouping$chooseTasks() method selects a task by a higher 
probability for the nodes with lower network response times.
But the implementation is straightforward. To calculate the ping between nodes, 
the network information of the nodes that tasks are performing is needed. I got 
this information from Zookeeper using the Zookeeper$getData() method. At this 
time, in order to create a Zookeeper object, I had no choice but to receive the 
connection information of the Zookeeper from the user.
Please let me know, if there is a way to get the network information of each 
node without requiring additional parameters from the user and if you have any 
additional comments.
Thank you for reading.


  was:
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.
!sender-imbalance.png|height=250,width=250!

[jira] [Updated] (STORM-2539) Locality aware grouping, which is a new grouping method considering locality

2017-05-31 Thread Siwoon Son (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siwoon Son updated STORM-2539:
--
Description: 
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.
!sender-imbalance.png|height=250,width=250!
(a) Example of sender imbalance problem.
!receiver-imbalance.png|height=250,width=250!
(b) Example of receiver imbalance problem.
So, I propose locality aware grouping which can solve the load balancing 
problem while considering the locality. Locality aware grouping is a method of 
periodically calculating the ping response time between nodes and transmitting 
more tuples probabilistically to nodes with low response time. I implemented 
the proposed Locality aware grouping at 
[https://github.com/dke-knu/i2am/tree/master/i2am-app/locality-aware-grouping]. 
LocalityAwareGrouping.java is a class that implements locality aware grouping. 
LocalityGroupingTestTopology.java, TupleGeneratorSpout.java, and 
PerformanceLoggingBolt.java are topology classes for testing this. 
LocalityAwareGrouping$prepare() method reads the network information of each 
node from the Zookeeper and activates the thread. This thread periodically 
calculates the ping response time of each node. 
LocalityAwareGrouping$chooseTasks() method selects a task by a higher 
probability for the nodes with lower network response times.
But the implementation is straightforward. To calculate the ping between nodes, 
the network information of the nodes that tasks are performing is needed. I got 
this information from Zookeeper using the Zookeeper$getData() method. At this 
time, in order to create a Zookeeper object, I had no choice but to receive the 
connection information of the Zookeeper from the user.
Please let me know, if there is a way to get the network information of each 
node without requiring additional parameters from the user and if you have any 
additional comments.
Thank you for reading.


  was:
I’d like to propose a new grouping method. This method solves problems that can 
occur with existing _Shuffle grouping_ method and _Local-or-shuffle_ method.

I was motivated by the following a question.
bq. "Would not it be more efficient to transfer tuples to nearby nodes?"
Among the Storm's grouping methods, _Shuffle grouping_ is a method of randomly 
selecting a task of the next node. In this method, all nodes can receive data 
evenly, but the same amount of data is transferred to a relatively distant 
node, which can cause a high delay. To solve this problem, Storm provides 
Local-or-shuffle grouping considering locality.
_Local-or-shuffle grouping_ can minimize the delay by internally processing 
data in the process if the task receiving the data is in the same process. 
Local-or-shuffle grouping, however, considers *only locality*, which may leads 
to two load balancing problems: +the sender imbalance+ and +the receiver 
imbalance+. First, the sender imbalance problem is a load balancing problem in 
which traffic is concentrated on a specific task because the number of senders’ 
tasks and the number of nodes are not equal. Next, the receiver imbalance 
problem is a load balancing problem in which traffic is concentrated on a 
specific object because the number of receivers' tasks and the number of nodes 
are not equal. If these problems occur, the tasks of receivers will perform 
different amounts of work, resulting in performance degradation and processing 
delays.
!sender-imbalance.png!
(a) Example of sender