Hey Jae,

Every resource manager has to solve the split-brain/orphaned container
problem. There are several issues to check:

1. Simulate a network partition between the master (RM in YARN) and slave
(NM in YARN).
2. `kill -9` the slave (NM in YARN).

In YARN's case, I know for sure that (2) will result in the containers
being leaked. The PPID on the container will be switched to 1. This is just
how UNIX works. I suspect `kill -9`'ing the slave in Mesos will result in
the same behavior.

For (1), every distributed system has to solve this. How do you detect a
real partition (vs. a long GC, for example), and when you do detect a
partition, how do you react to it.

I am testing (1) for YARN right now (using hello-samza, and killing the
RM). I will let you know how it behaves shortly. I believe it retries to
connect to the RM for some period of time, and then the NM kills itself if
it can't. If this is the case, then the container *would not be orphaned*.
I also believe the retry count and wait time is tunable, so you can define
your own exposure (e.g. you have a duplicate container for 1 minute, before
the NM shuts itself down).

Anecdotally, we've not seen leaked containers in YARN since we began
properly shutting down NMs (not kill -9'ing them).

> Depending on the time line among stabilizing stand alone and Mesos support

Regarding stabilizing standalone, I'm working on the design doc right now.
A proposed sketch of a ZK-based implementation was posted on SAMZA-516
yesterday. My goal is to get the design doc done by tomorrow. This would
let us discuss and open subtasks next week, and start coding thereafter.
Realistically, I think standalone can be committed before end of Q1, and
should be usable. After a month or two of operation, I'd wager it'll be
relatively stable. So, that puts things at mid-Q2.

Cheers,
Chris

On Thu, Jan 22, 2015 at 12:24 PM, Bae, Jae Hyeon <[email protected]> wrote:

> I read through SAMZA-375. We will do one more round PoC Samza on Mesos.
>
> On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <[email protected]>
> wrote:
>
> > I asked Mantis guy about orphaned container in Mesos and he was almost
> > sure that Mesos won't let that happen.
> >
> > How is https://issues.apache.org/jira/browse/SAMZA-375 going? Depending
> > on the time line among stabilizing stand alone and Mesos support, our
> > schedule or decision will be changed.
> >
> > Thank you
> > Best, Jae
> >
> > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
> > [email protected]> wrote:
> >
> >> Hey all,
> >>
> >> Also, just opened this ticket to track work on samza-standalone:
> >>
> >>   https://issues.apache.org/jira/browse/SAMZA-516
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 1/21/15 1:32 PM, "Chris Riccomini" <[email protected]> wrote:
> >>
> >> >Hey Jae,
> >> >
> >> >> So, we need to find out running Samza on Mesos won't create that
> >> >>problem, or Spark Streaming won't have that issue. In the worst case,
> >> >>creating our own distribution coordination might be more predictable
> >> >>instead of running Yarn on EMR.
> >> >
> >> >I think that there are two ways to fix this. One is to have the Kafka
> >> >broker detect that there are two producers that are "the same", and
> start
> >> >dropping messages from the "old one" (and perhaps throw an exception to
> >> >the old producer). The other way is to have the Samza container detect
> >> the
> >> >problem, and kill itself.
> >> >
> >> >The kafka-based approach is a subset of the transactionality feature
> >> >described here:
> >> >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >> >i
> >> >n+Kafka
> >> >
> >> >The problem with the Kafka approach is that 1) it's kafka-specific, and
> >> 2)
> >> >the generation id required to drop messages from an orphaned producer
> >> >hasn't been implemented, except in a branch that's not been committed.
> >> >
> >> >So, if we accept that we shouldn't use Kafka as the solution for
> >> detecting
> >> >orphaned containers, the solution will have to go into Samza. Within
> >> >Samza, there are two approaches. One is to use the resource scheduler
> >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to use
> >> >Samza, itself, to detect the problem.
> >> >
> >> >A YARN-specific example of how to solve the problem would be to have
> the
> >> >SamzaContainer periodically poll its local NM's REST endpoint:
> >> >
> >> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
> >> >
> >> >To see what the status is, its last update time, etc. If the REST
> >> endpoint
> >> >can't be reached, the node is unhealthy, or the last update time is >
> >> some
> >> >time interval, the container could kill itself. Again, this is
> >> >YARN-specific.
> >> >
> >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
> >> >SAMZA-375:
> >> >
> >> >
> >> >
> >>
> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&;
> >> >p
> >>
> >>
> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
> >> >n
> >> >t-14286204
> >> >
> >> >The last solution that I mentioned, using Samza directly (no dependency
> >> on
> >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to me.
> >> We
> >> >can either 1) introduce a heartbeat message into the coordinator
> stream,
> >> >or 2) use the existing checkpoint message as a heartbeat.  There is
> some
> >> >complexity to this solution that would need to be thought through,
> >> though.
> >> >For example, should the heartbeat messages be sent from the main
> thread?
> >> >What happens if the main thread is blocked on process() for an extended
> >> >period of time?
> >> >
> >> >What do others think? As a short-term fix, it seems to me like
> YARN/Mesos
> >> >should handle this automatically for us. Has anyone had experience with
> >> >orphaned containers in Mesos?
> >> >
> >> >> I really appreciate if you give me some guideline about implementing
> >> >>custom cluster management interface of Samza.
> >> >
> >> >Samza jobs are started through bin/run-job.sh (inside samza-shell).
> This
> >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
> >> >job.factory.class), which returns a StreamJob. To implement your own
> >> >cluster management, the first thing you'll need to do is implement
> >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
> >> >ProcessJob/ProcessJobFactory for an example of how to do this.
> >> >
> >> >Note that this code has changed slightly between 0.8.0 and master
> >> (0.9.0).
> >> >In 0.9.0, the partition-to-container assignment logic has been pulled
> out
> >> >of YARN's AM, and into a JobCoordinator class.
> >> >
> >> >The trick with adding EC2 ASG is going to be in handling partition
> >> >shifting when a new node is added to the group. For example, if you
> have
> >> >two machines, each running one container, and you add a third machine,
> >> >some of the input partitions (and corresponding StreamTasks) need to be
> >> >shifted from the two machines on to the third. The only way to do this
> >> >right now is to:
> >> >
> >> >1. Stop all containers.
> >> >2. Re-instantiate the JobCoordinator with a new container count.
> >> >3. Start new containers on all three machines with the new partition
> >> >assignments.
> >> >
> >> >In an ideal world, steps (1-3) would be handled automatically by Samza,
> >> >and wouldn't require container restarts. This is precisely what
> >> >samza-standalone will accomplish. If you're interested in contributing
> to
> >> >samza-standalone, that would be awesome. I'm working on a design doc
> >> right
> >> >now, which I'm trying to post by EOW. Once that's done, we can
> >> collaborate
> >> >on design and split the code up, if you'd like.
> >> >
> >> >
> >> >Cheers,
> >> >Chris
> >> >
> >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote:
> >> >
> >> >>Hi Samza Devs
> >> >>
> >> >>The significant concern I got recently is, container leak. The data
> >> >>pipeline based on Samza can guarantee at least once delivery but the
> >> >>duplicate rate is over 1.0%, I am having alerts right now. Container
> >> >>leaks
> >> >>will push a lot of alerts to me.
> >> >>
> >> >>So, we need to find out running Samza on Mesos won't create that
> >> problem,
> >> >>or Spark Streaming won't have that issue. In the worst case, creating
> >> our
> >> >>own distribution coordination might be more predictable instead of
> >> >>running
> >> >>Yarn on EMR.
> >> >>
> >> >>What about standalone Samza? If this is quite plausible and the best
> >> >>solution in the near future, I want to be able to contribute. Could
> you
> >> >>share your thoughts or plans?
> >> >>
> >> >>I really appreciate if you give me some guideline about implementing
> >> >>custom
> >> >>cluster management interface of Samza. If it's possible, I want to
> take
> >> a
> >> >>look to replace Yarn support with EC2 ASG stuff.
> >> >>
> >> >>Thank you
> >> >>Best, Jae
> >> >
> >>
> >>
> >
>

Reply via email to