Hi Kishore,

I tried the new changes and they worked great!

I wanted to make a review of the changes, but it would be much easier using reviews.apache.org, could you upload the diff there?

I also noticed there are dependencies on Helix from s4-core and s4-comm, how hard would it be to extract the Helix-specific bits to a separate module, similar to the YARN integration?

Thanks a lot for your work, it's a very nice integration!

Regards,
Daniel

On 1/3/13 9:06 AM, kishore g wrote:
Hi Guys,

Happy new year!. I pushed the changes to S4-110 branch.  I have added
the instruction to test addition of nodes. I added the support to tag
nodes with a group name. This allows one to specify the node group name
at task creatiion/app deployment.
Does it makes sense ?

thanks,
Kishore G


On Wed, Dec 5, 2012 at 10:20 AM, kishore g <g.kish...@gmail.com
<mailto:g.kish...@gmail.com>> wrote:

    You were brave enough to try them. You did the right thing for
    adding nodes, disabling node and swapping nodes. For rebalancing
    App, we need to change the DeployApp code to add the new node. The
    reason rebalance did not work for App is its set as CUSTOM
    assignment, which means S4 owns the mapping of Apps to Nodes. One
    way to work around this is to run DeployApp again and it will deploy
    it to new nodes.

    Using Helix command directly will work but its better to provide s4
    specific commands for the commands we intend to support.

    Thanks again for trying it out.

    thanks,
    Kishore G


    On Wed, Dec 5, 2012 at 7:42 AM, Daniel Gómez Ferro
    <danie...@yahoo-inc.com <mailto:danie...@yahoo-inc.com>> wrote:

        Hi Kishore,

        I just tried it and I think it is very promising!

        I followed the readme and everything worked (apart from the
        adapter which you mentioned). Then I wanted to make a few
        changes to the partitioning but I couldn't get it to work,
        probably because I don't understand Helix. I'm sure I wasn't
        supposed to do most of the things I did, but maybe it's valuable
        to you! :)

        First I added a new node using the helix-admin command and
        started the node. One of the partitions got assigned to the new
        node (and unassigned from the old one) but the new node didn't
        deploy the application. I tried doing a rebalance of myApp but
        that somehow messed the S4 configuration.

        If I configure the cluster for 3 nodes and start only 2, it
        works great, even after starting the 3rd one (it rebalances the
        partitions correctly)

        I also tried swaping two instances and that also worked (I
        think!). The only comment is that at first I had no idea how to
        disable an instance (it's done with --enable <cluster>
        <instance> false)

        My guess is that I shouldn't be using directly the helix-admin
        command. Do we need to provide custom "s4 <command>" for
        rebalancing, bringing down nodes, etc?

        I think the work is great, sorry I couldn't resist playing with
        it! It would be great to have a short guide on how to do a basic
        rebalancing operation and so on.

        Regards,

        Daniel


        On Tue Dec  4 20:00:04 2012, kishore g wrote:

            Daniel, did you get a chance to try it out. I have fixed
            some issues in the
            emitter code to use the partitioning per stream. Would love
            to hear your
            comments.

            I will create a branch S4-110 with my changes.


            On Fri, Nov 30, 2012 at 2:06 PM, kishore g
            <g.kish...@gmail.com <mailto:g.kish...@gmail.com>> wrote:

                Agree.


                On Fri, Nov 30, 2012 at 10:57 AM, Matthieu Morel
                <mmo...@apache.org <mailto:mmo...@apache.org>>wrote:

                    Sorry, I had replied to the user list !

                    Begin forwarded message:

                        From: Matthieu Morel <mmo...@apache.org
                        <mailto:mmo...@apache.org>>
                        Subject: Re: Using Helix for cluster management
                        of S4
                        Date: November 30, 2012 7:56:39 PM GMT+01:00
                        To: s4-u...@incubator.apache.org
                        <mailto:s4-u...@incubator.apache.org>


                        Regarding redistribution of partitions with
                        stateful PEs, another

                    simple solution is to activate checkpointing.


                        Then when you need to repartition:
                        - on the origin node, upon a rebalancing request
                        for partition p, you

                    simply invalidate PE instances from partition p from
                    the local cache (and
                    trigger checkpointing if necessary)

                        - on the destination node, there is nothing to
                        do, state for PEs

                    belonging to partition p is lazily reloaded (i.e.
                    when necessary).


                        Of course there are other solutions, such as
                        manually snapshotting to a

                    data store, but that may be more complex.


                        Matthieu

                        On Nov 30, 2012, at 7:44 PM, kishore g wrote:


                            Matt,

                            Thats right the key idea is to over
                            partition the stream( number of

                    partitions higher than the number of nodes).


                            Helix supports two modes, when a new node is
                            added it automatically

                    moves partitions. We call this AUTO_REBALANCE. This
                    is recommended for
                    stateless tasks. Another mode is SEMI_AUTO where you
                    can change the
                    topology using admin command. So what  one would do
                    is add a new node and
                    then rebalance the cluster by invoking an Helix Api,
                    Helix will then
                    re-distribute partitions and as I said earlier will
                    minimize the movement
                    and co-ordinate the movement. When I say
                    co-ordinate, it will first ask the
                    old leader to stop processing the partition. It can
                    then snapshot its
                    state. Once thats done Helix will ask the new node
                    to host the partition,
                    where it can load the snapshot. I will add this
                    example to the walk through
                    instruction.


                               Daniel,

                            Yes that is possible, I think createTask and
                            deployApp commands

                    already take an optional parameter to list a subset
                    of nodes but I think I
                    have only implemented it for deployApp. Adding it
                    for createTask( which
                    assigns stream processing to s4 nodes) is straight
                    forward.


                            Matthieu mentioned that in the Readme
                            instructions the adapter command

                    is invoking the old code. I will make that change in
                    some time. If you are
                    trying this now, then run GenericEventAdapter from
                    eclipse directly. ( The
                    same options hold good).


                            Yes JIRA is 110. I will add the description.
                            I am pretty sure there

                    will be issues :-)




                            On Fri, Nov 30, 2012 at 8:38 AM, Daniel
                            Gómez Ferro <

                    danie...@yahoo-inc.com
                    <mailto:danie...@yahoo-inc.com>> wrote:

                            I agree with Matthieu, that's a really nice
                            integration!

                            I particularly like having different
                            partition schemes per stream. I

                    guess it would be easy (or at least possible) to
                    implement some kind of
                    isolation where only a subset of nodes handles a
                    specific stream, for
                    example (related to S4-91).


                            It looks really nice, I'm looking forward to
                            trying it. I'll give more

                    feedback if I run into any issues. I guess the right
                    JIRA for that would be
                    S4-110, right? (It's missing a description!)


                            Good job!

                            Daniel


                            On Fri Nov 30 16:37:11 2012, Matthieu Morel
                            wrote:
                            Thanks Kishore, that's a very interesting
                            contribution!

                            It's also very appropriate considering that
                            S4 is completely

                    decentralized and that there is no
                    driving/scheduling entity: the logic is
                    within the nodes. So it's nice to have a way to
                    easily describe and define
                    coordinated behaviors, and to easily automate them.


                            About the partitioning, the key here as I
                            understand it, is to have a

                    number of partitions higher than the number of nodes
                    by default, possibly
                    several times higher. So a given node is assigned
                    multiple partitions. (In
                    contrast, until now in S4, nb partitions <= nb
                    nodes, including standby
                    nodes).


                            In the canonical example that you provide,
                            how do we proceed if we

                    want to add another s4 node? That's not clear to me,
                    and it would help
                    understand how partitions are reassigned.


                            Thanks!

                            Matthieu




                            In S4 the number of partition is fixed for
                            all streams and is

                    dependent on

                            the number of nodes in the cluster.  Adding
                            new nodes to S4 cluster

                    causes

                            the number of partitions to change. This
                            results in lot of data

                    movement.

                            For example if there are 4 nodes and you add
                            another node then nearly

                    all

                            keys will be remapped which result is huge
                            data movement where as

                    ideally

                            only 20% of the data should move.

                            By using Helix, every stream can be
                              partitioned differently and
                            independent of the number of nodes. Helix
                            distributes the partitions

                    evenly

                            among the nodes. When new nodes are added,
                            partitions can be migrated

                    to

                            new nodes without changing the number of
                            partitions and  minimizes the

                    data

                            movement.

                            In S4 handles failures by having stand by
                            nodes that are idle most of

                    the

                            time and become active when a node fails.
                            Even though this works, its

                    not

                            ideal in terms of efficient hardware usage
                            since the stand by nodes are
                            idle most of the time. This also increases
                            the fail over time since

                    the PE

                            state has to be transfered to only one node.

                            Helix allows S4 to have Active and Standby
                            nodes at a partition level

                    so

                            that all nodes can be active but some
                            partitions will be Active and

                    some in

                            stand by mode. When a node fails, the
                            partitions that were  Active on

                    that

                            node will be evenly distributed among the
                            remaining nodes. This

                    provides

                            automatic load balancing and also improves
                            fail over time, since PE

                    state

                            can be transfered to multiple nodes in parallel.

                            I have a prototype implementation here
                            https://github.com/kishoreg/__incubator-s4
                            <https://github.com/kishoreg/incubator-s4>

                            Instructions to build it and try it out are
                            in the Readme.

                            More info on Helix can be found here,

                    http://helix.incubator.apache.__org/
                    <http://helix.incubator.apache.org/>


                            Helix can provide lot of other
                            functionalities like

                                 1. Configure the topology according to
                            use case. For example,

                    co-locate

                                 the partitions of different streams to
                            allow efficient joins.

                    Configure the

                                 number of standby for each partition
                            based on the head room

                    available.

                                 2. When new nodes are added, it can
                            throttle the data movement
                                 3. Comes with large set of admin tools
                            like enable/disable node,
                                 dynamically change the topology etc.
                            Provides a rest interface to

                    manage

                                 the cluster.
                                 4. Allows one to schedule custom tasks
                            like snapshot the PE's in a
                                 partition and restore from the snapshot.


                            Would like to get your feedback.

                            Thanks,
                            Kishore G









Reply via email to