[
https://issues.apache.org/jira/browse/SAMZA-516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286772#comment-14286772
]
Chris Riccomini commented on SAMZA-516:
---------------------------------------
My current line of thinking is to write a StreamJob implementation that
orchestrates jobs via ZooKeeper. The basic flow would look something like this:
h3. Starting a single process.
# Run {{run-job.sh}} on machine-1.
# {{run-job.sh}} uses the supplied job config to instantiate
StandaloneJobFactory, and gets a StandaloneJob.
# StandaloneJob connects to ZK.
# StandaloneJob adds itself as a process in the {{job group}} ZK directory
using an ephemeral node.
# StandaloneJob sets itself as a watcher in the {{job group}} container
assignment directory.
## When StandaloneJob receives container assignments, it creates a new thread,
and instantiates a new SamzaContainer on the thread.
## When StandaloneJob loses a container assignment, it interrupts the
appropriate thread, which shuts down the SamzaContainer.
# StandaloneJob checks if there is a JobCoordinator leader elected.
# If there is no leader elected, StandaloneJob tries to elect itself as leader,
and assigns containers to processes in the {{job group}}.
## Check for the number of connected processes in the {{job group}}.
## Use job.container.count to create a JobCoordinator with containerCount set
accordingly.
## Assign containers equally among all processes in the {{job group}}.
## Set a watcher on the {{job group}} ZK directory.
# SamzaContainer will then query the JobCoordinator via its HTTP API to begin
starting the SamzaContainer.
h3. Starting a second process.
# All of the above steps are repeated. The second {{run-job.sh}} command could
be run on machine-1, or some other machine.
# A leader is already elected, so no leadership election happens for the
JobCoordinator.
# When this process registers itself in ZK (step (4), above), the watcher set
in step (7.4) will be notified.
# Watcher set in (7.4) will re-assign partitions from the first process to the
second.
# When reassignment happens, watcher set in (5) will trigger the start of new
containers in this process.
h3. Handling failover.
# The first process fails.
# Its ephemeral node times out of ZooKeeper.
# The second process is notified via ZK that the process in the {{job group}}
that was the owner of the JobCoordinator has failed.
# The second process elects itself as JobCoordinator leader.
# The new JobCoordinator diffs the list of processes in the {{job group}} with
the container assignments.
# Any containers that were assigned to the dead processes are shifted to live
processes. This shift triggers the containers to start via the watcher defined
in step (5.1).
h3. Notes
* The StandaloneJob could use processes instead of threads to run containers.
This would allow task.opts to be set on SamzaContainers, and would also keep
container code fully isolated from the JobCoordinator, and StandaloneJob ZK
logic.
* The process:container:stream task:partition nomenclature is pretty
cumbersome. We're also adding a new concept here: something above the
container. Currently, in YARN, the YARN container to SamzaContainer mapping is
1:1. The approach described above breaks this model, and makes it 1:*. The
benefit of this is that it means we can shift SamzaContainers amongst Java
processes without stopping existing containers. It also means we don't have to
change any SamzaContainer code--everything in there can remain immutable.
* When we move the AM UI out of YARN and into JobCoordinator, you'll be able to
use the UI in standalone mode, but the UI will jump from node to node as
machines fail. This seems kind of annoying.
* Does it make sense to have the JobCoordinator run as an independent process?
When it's down, existing containers continue to run, but no new containers can
be added (I think).
* This design doesn't support multi-job queries. For example, if you wanted to
run "SELECT member\_id, COUNT(*) FROM stream GROUP BY member\_id", you
basically can't, since this query requires running two jobs (one to
repartition, and one to aggregate).
* I haven't worked through the details on whether we could end up with orphaned
processes that continue producing even after the coordinator thinks they're
failed. This concern was voiced on the mailing list. If we can't safely rely on
ZK to notify us when a container is ACTUALLY dead, then we could end up in
split-brain scenarios, where we have multiple containers processing the same
data.
> Support standalone Samza jobs
> -----------------------------
>
> Key: SAMZA-516
> URL: https://issues.apache.org/jira/browse/SAMZA-516
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.9.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
>
> Samza currently supports two modes of operation out of the box: local and
> YARN. With local mode, a single Java process starts the JobCoordinator,
> creates a single container, and executes it locally. All partitions are
> procesed within this container. With YARN, a YARN grid is required to
> execute the Samza job. In addition, SAMZA-375 introduces a patch to run Samza
> in Mesos.
> There have been several requests lately to be able to run Samza jobs without
> any resource manager (YARN, Mesos, etc), but still run it in a distributed
> fashion.
> The goal of this ticket is to design and implement a samza-standalone module,
> which will:
> # Support executing a single Samza job in one or more containers.
> # Support failover, in cases where a machine is lost.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)