[
https://issues.apache.org/jira/browse/SAMZA-335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14064117#comment-14064117
]
Raul Castro Fernandez commented on SAMZA-335:
---------------------------------------------
Disclaimer: I haven't thought through the details of what I am writing next,
but this is one idea:
*What about just relying on Kafka way of balancing leaders to exploit data
locality?*
This would be more or less how it works:
- A Samza task has to always run in a node that hosts the Kafka leader for one
of its input partitions. Since Kafka load balancing strategy already tries to
balance its leaders, this would form a baseline distribution strategy.
This strategy, of course, will lead to bottlenecks. As it is discussed in
SAMZA-334, there are many additional sources of bottlenecks, and not
necessarily depend only on the input data. More in general, one cannot make any
prediction about these bottlenecks before actually running the tasks.
*How to address bottlenecks?*
Once a node becomes a bottleneck and we detect it (according to X policy) we
trigger a Kafka leader re-election (mechanism), so that one or more leaders
will move to other nodes, along with the Samza task, thus helping to overcome
the bottleneck.
*How do we know the new node for the leader?*
We cannot tell YARN where to run a task However, our own infrastructure knows
where it is running stuff i.e. Zookeeper knows where the new leader for the
partition is running.
*How does that work? How does the Samza task moves from one node to another?*
One idea is to run all tasks on all nodes in the cluster. Sounds crazy, but the
trick is that they are not really running. They are "ready to run". In this
way, any container in any node can run any Samza task. Each node has a number
of containers equals (at least) to the number of partitions (that is both
leaders and followers) in that node.
With this scheme, once a new leader becomes available in a partition, then
there is a container "reserved" to execute a Samza task in that same node.
*What about state?*
As Martin pointed out, the problem of a task that suddenly runs in a different
node is that its state is now remote -> bad. This would be one strategy to
address this:
The Samza task is "associated" to a given leader and to a given state. The
state has a changelog, that is itself replicated. We need to make sure that the
replicas of the changelog live in the same place as the followers of the
leader. In this way, if a Samza task needs to "migrate" it will find its own
replicated state in that target machine.
{quote}
At the moment, every time you restart a Samza job, any state that it has built
up must be reconstructed from the changelog, which may take a while if there's
a
lot of state. As an optimization, in order to make job restarts faster, it
might be possible to reuse any on-disk state store across container restarts,
and skip
the changelog restore in that case.
{quote}
This is a good point. In general it is good to understand whether it is cheaper
to rebuild that state or just checkpoint and backup it to disk.
I believe something along these lines would provide some "data locality" by
exploiting Kafka leader load balancing. It would also provide an auto-scale
facility for Samza, and it would still maintain compatible with stateful jobs,
while still benefiting from YARN capabilities.
Drawbacks:
- It is a Kafka centric solution.
- Kafka leader balancing may required some rethink.
- Not entirely convinced that this is better than just write a replica-aware
YARN scheduler (basically this would require YARN to understand dataflows graph
of containers instead of single containers).
- There might be required to read the state that Zafka stores in Zookeeper from
Samza, although this probably can be workaround.
> Locality for Samza containers
> -----------------------------
>
> Key: SAMZA-335
> URL: https://issues.apache.org/jira/browse/SAMZA-335
> Project: Samza
> Issue Type: Improvement
> Components: container
> Reporter: Chinmay Soman
>
> [Documenting my discussion with Jakob]
> There doesn't seem to be any 'data' locality for deploying containers. Today,
> this is done purely based on the free resources. It'll be nice to have some
> kind of locality with respect to the input streams.
--
This message was sent by Atlassian JIRA
(v6.2#6252)