[
https://issues.apache.org/jira/browse/SAMZA-71?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-71:
---------------------------------
Fix Version/s: 0.8.0
> Support new partitioning strategies
> -----------------------------------
>
> Key: SAMZA-71
> URL: https://issues.apache.org/jira/browse/SAMZA-71
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Jakob Homan
> Labels: project
> Fix For: 0.8.0
>
>
> Currently, the number of stream tasks instances that are created for a Samza
> job are equal to the max number of partitions across all input streams. For
> example, if your Samza job is using two YARN containers, and has two input
> streams (IS1 and IS2), and IS1 has 4 partitions, and IS2 has 8 partitions,
> then the Samza job will have a total of max(4,8)=8 partitions. Therefore, 8
> StreamTask instances would be created (spread as evenly as possible across
> two YARN containers).
> This scheme works for co-grouping when both input streams have the same
> number of partitions, and are partitioned using the same partitioning scheme
> (e.g. a Kafka partitioner that partitions by member ID). The parallelism of
> the job is limited by the number of StreamTask instances, which means that
> the parallelism is limited by the max number of partitions across all input
> streams (8 in the example above).
> We can actually do better than these guarantees. We should change Samza's
> partitioning style to behave in the following way:
> 1. Support a task.partition.scheme=max config. Samza will create one stream
> task instance for each input stream partition. In the example above, IS1 has
> 4 partitions, and IS2 has 8 partitions, so Samza would create 4+8=12
> StreamTasks. Each input stream's partition would be mapped to a unique
> StreamTask that would receive messages ONLY from that input stream/partition
> pair, and from no other. Using this style of partitioning increases a Samza
> job's possible parallelism to be the absolute maximum (based on Kafka
> semantics, which limit a single consumer for each input stream/partition
> pair).
> 2. Support a task.partition.scheme=cogroup config. Samza will create one
> stream task instance for the greatest common denominator of all stream task
> partition counts. For example, in the example above, IS1 has 4 partitions,
> and IS2 has 8. GCD(4,8)=4, so Samza would create four partitions. If IS1 had
> 4 partitions, and IS2 had 6, then GCD(4,6)=2, so the Samza job would have two
> StreamTask instances. Using this style can decrease a Samza job's
> parallelism, but provides the guarantee that a StreamTask instance will
> receive all messages across all input streams for a key that it's in charge
> of. For example, if a StreamTask is consuming AdViews and AdClicks, and both
> are partitioned by member ID, but AdViews has 12 partitions, and AdClicks has
> 8 partitions, then there will be 4 StreamTask instances, and each instance
> will receive rougly 1/4th of all clicks and views, and all clicks and views
> for a given member ID will be mapped to just one of the StreamTask, so
> aggregation/joining will be possible.
> The default task.partition.scheme will be max, when the user hasn't specified
> a partition scheme. Thus, the default will not allow any aggregation or
> joining across input streams.
> With both of these styles, we can still use the Partition class (and
> getPartitionId) to identify each StreamTask instance, but we will need to
> devise a deterministic way to map from each input stream/partition pair to
> each StreamTask partition.
> In the case of style #1 (max), consider the case where we have IS1 with 4
> partitions and IS2 with 8 partitions. We can use the order of task.inputs to
> define an ordering across stream names. We can then instantiate all 12
> StreamTasks, and simply iterate over all input stream's based on their
> task.inputs order and sorted partition sets to do the mapping. If we had
> task.inputs=IS2,IS1, the mapping would look like this:
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:4
> IS2:5 - StreamTask:5
> IS2:6 - StreamTask:6
> IS2:7 - StreamTask:7
> IS1:0 - StreamTask:8
> IS1:1 - StreamTask:9
> IS1:2 - StreamTask:10
> IS1:3 - StreamTask:11
> In the case of style #2 (cogroup), consider the case where IS1 has 8
> partitions and IS2 has 12 partitions. GCD(8,12)=4, so 4 StreamTasks would be
> created. The mapping in this case should then be:
> IS1:0 - StreamTask:0
> IS1:1 - StreamTask:1
> IS1:2 - StreamTask:2
> IS1:3 - StreamTask:3
> IS1:4 - StreamTask:0
> IS1:5 - StreamTask:1
> IS1:6 - StreamTask:2
> IS1:7 - StreamTask:3
> IS2:0 - StreamTask:0
> IS2:1 - StreamTask:1
> IS2:2 - StreamTask:2
> IS2:3 - StreamTask:3
> IS2:4 - StreamTask:0
> IS2:5 - StreamTask:1
> IS2:6 - StreamTask:2
> IS2:7 - StreamTask:3
> IS2:8 - StreamTask:0
> IS2:9 - StreamTask:1
> IS2:10 - StreamTask:2
> IS2:11 - StreamTask:3
> As you can see, the assignment is done by modding each input stream's
> partition number by the GCD value (4, in this case). This assignment strategy
> has the nice guarantee that keys will map to the same StreamTask across input
> streams with different partition counts (provided that they're partitioned by
> the same key). For example, member ID 1213 % 8 = partition 5 in IS1, and 1213
> %12 = partition 1 in IS2. If you then mod by the GCD (4), you get 5%4=1 and
> 1%4=1. The same holds true for other keys, as well.
> 1211%8=3 .. 3%4=3
> 1211%12=11 .. 11%4=3
> Both of these partition assignment schemes work only as long as the guarantee
> that the task.inputs stream order is static (or new streams are appended to
> the end), and that each input stream's partition count is static, and will
> never change.
> You can use the Euclidean algorithm to find the GCD:
> http://www-math.ucdenver.edu/~wcherowi/courses/m5410/exeucalg.html
--
This message was sent by Atlassian JIRA
(v6.2#6252)