Hi Kishore,

I tried the new changes and they worked great!

I wanted to make a review of the changes, but it would be much easier using reviews.apache.org, could you upload the diff there?

I also noticed there are dependencies on Helix from s4-core and s4-comm, how hard would it be to extract the Helix-specific bits to a separate module, similar to the YARN integration?

Thanks a lot for your work, it's a very nice integration!


On 1/3/13 9:06 AM, kishore g wrote:
Hi Guys,

Happy new year!. I pushed the changes to S4-110 branch.  I have added
the instruction to test addition of nodes. I added the support to tag
nodes with a group name. This allows one to specify the node group name
at task creatiion/app deployment.
Does it makes sense ?

Kishore G

On Wed, Dec 5, 2012 at 10:20 AM, kishore g wrote:


    You were brave enough to try them. You did the right thing for
    adding nodes, disabling node and swapping nodes. For rebalancing
    App, we need to change the DeployApp code to add the new node. The
    reason rebalance did not work for App is its set as CUSTOM
    assignment, which means S4 owns the mapping of Apps to Nodes. One
    way to work around this is to run DeployApp again and it will deploy
    it to new nodes.

    Using Helix command directly will work but its better to provide s4
    specific commands for the commands we intend to support.

    Thanks again for trying it out.

    Kishore G

    On Wed, Dec 5, 2012 at 7:42 AM, Daniel Gómez Ferro wrote:
    

        Hi Kishore,

        I just tried it and I think it is very promising!

        I followed the readme and everything worked (apart from the
        adapter which you mentioned). Then I wanted to make a few
        changes to the partitioning but I couldn't get it to work,
        probably because I don't understand Helix. I'm sure I wasn't
        supposed to do most of the things I did, but maybe it's valuable
        to you! :)

        First I added a new node using the helix-admin command and
        started the node. One of the partitions got assigned to the new
        node (and unassigned from the old one) but the new node didn't
        deploy the application. I tried doing a rebalance of myApp but
        that somehow messed the S4 configuration.

        If I configure the cluster for 3 nodes and start only 2, it
        works great, even after starting the 3rd one (it rebalances the
        partitions correctly)

        I also tried swaping two instances and that also worked (I
        think!). The only comment is that at first I had no idea how to
        disable an instance (it's done with --enable <cluster>
        <instance> false)

        My guess is that I shouldn't be using directly the helix-admin
        command. Do we need to provide custom "s4 <command>" for
        rebalancing, bringing down nodes, etc?

        I think the work is great, sorry I couldn't resist playing with
        it! It would be great to have a short guide on how to do a basic
        rebalancing operation and so on.



        On Tue Dec  4 20:00:04 2012, kishore g wrote:

            Daniel, did you get a chance to try it out. I have fixed
            some issues in the
            emitter code to use the partitioning per stream. Would love
            to hear your

            I will create a branch S4-110 with my changes.

            On Fri, Nov 30, 2012 at 2:06 PM, kishore g wrote:
            


                On Fri, Nov 30, 2012 at 10:57 AM, Matthieu Morel wrote:
                

                    Sorry, I had replied to the user list !

                    Begin forwarded message:

                        From: Matthieu Morel <mmo...@apache.org
                        Subject: Re: Using Helix for cluster management
                        of S4
                        Date: November 30, 2012 7:56:39 PM GMT+01:00
                        To: s4-u...@incubator.apache.org

                        Regarding redistribution of partitions with
                        stateful PEs, another

                    simple solution is to activate checkpointing.

                        Then when you need to repartition:
                        - on the origin node, upon a rebalancing request
                        for partition p, you

                    simply invalidate PE instances from partition p from
                    the local cache (and
                    trigger checkpointing if necessary)

                        - on the destination node, there is nothing to
                        do, state for PEs

                    belonging to partition p is lazily reloaded (i.e.
                    when necessary).

                        Of course there are other solutions, such as
                        manually snapshotting to a

                    data store, but that may be more complex.


                        On Nov 30, 2012, at 7:44 PM, kishore g wrote:


                            Thats right the key idea is to over
                            partition the stream( number of

                    partitions higher than the number of nodes).

                            Helix supports two modes, when a new node is
                            added it automatically

                    moves partitions. We call this AUTO_REBALANCE. This
                    is recommended for
                    stateless tasks. Another mode is SEMI_AUTO where you
                    can change the
                    topology using admin command. So what  one would do
                    is add a new node and
                    then rebalance the cluster by invoking an Helix Api,
                    Helix will then
                    re-distribute partitions and as I said earlier will
                    minimize the movement
                    and co-ordinate the movement. When I say
                    co-ordinate, it will first ask the
                    old leader to stop processing the partition. It can
                    then snapshot its
                    state. Once thats done Helix will ask the new node
                    to host the partition,
                    where it can load the snapshot. I will add this
                    example to the walk through


                            Yes that is possible, I think createTask and
                            deployApp commands

                    already take an optional parameter to list a subset
                    of nodes but I think I
                    have only implemented it for deployApp. Adding it
                    for createTask( which
                    assigns stream processing to s4 nodes) is straight

                            Matthieu mentioned that in the Readme
                            instructions the adapter command

                    is invoking the old code. I will make that change in
                    some time. If you are
                    trying this now, then run GenericEventAdapter from
                    eclipse directly. ( The
                    same options hold good).

                            Yes JIRA is 110. I will add the description.
                            I am pretty sure there

                    will be issues :-)

                            On Fri, Nov 30, 2012 at 8:38 AM, Daniel Gómez Ferro wrote:
                            Gómez Ferro <

                    

                            I agree with Matthieu, that's a really nice

                            I particularly like having different
                            partition schemes per stream. I

                    guess it would be easy (or at least possible) to
                    implement some kind of
                    isolation where only a subset of nodes handles a
                    specific stream, for
                    example (related to S4-91).

                            It looks really nice, I'm looking forward to
                            trying it. I'll give more

                    feedback if I run into any issues. I guess the right
                    JIRA for that would be
                    S4-110, right? (It's missing a description!)

                            Good job!


                            On Fri Nov 30 16:37:11 2012, Matthieu Morel wrote:
                            Thanks Kishore, that's a very interesting

                            It's also very appropriate considering that
                            S4 is completely

                    decentralized and that there is no
                    driving/scheduling entity: the logic is
                    within the nodes. So it's nice to have a way to
                    easily describe and define
                    coordinated behaviors, and to easily automate them.

                            About the partitioning, the key here as I
                            understand it, is to have a

                    number of partitions higher than the number of nodes
                    by default, possibly
                    several times higher. So a given node is assigned
                    multiple partitions. (In
                    contrast, until now in S4, nb partitions <= nb
                    nodes, including standby

                            In the canonical example that you provide,
                            how do we proceed if we

                    want to add another s4 node? That's not clear to me,
                    and it would help
                    understand how partitions are reassigned.



                            In S4 the number of partition is fixed for
                            all streams and is

                    dependent on

                            the number of nodes in the cluster.  Adding
                            new nodes to S4 cluster


                            the number of partitions to change. This
                            results in lot of data


                            For example if there are 4 nodes and you add
                            another node then nearly


                            keys will be remapped which result is huge
                            data movement where as


                            only 20% of the data should move.

                            By using Helix, every stream can be
                              partitioned differently and
                            independent of the number of nodes. Helix
                            distributes the partitions


                            among the nodes. When new nodes are added,
                            partitions can be migrated


                            new nodes without changing the number of
                            partitions and  minimizes the



                            In S4 handles failures by having stand by
                            nodes that are idle most of


                            time and become active when a node fails.
                            Even though this works, its


                            ideal in terms of efficient hardware usage
                            since the stand by nodes are
                            idle most of the time. This also increases
                            the fail over time since

                    the PE

                            state has to be transfered to only one node.

                            Helix allows S4 to have Active and Standby
                            nodes at a partition level


                            that all nodes can be active but some
                            partitions will be Active and

                    some in

                            stand by mode. When a node fails, the
                            partitions that were  Active on


                            node will be evenly distributed among the
                            remaining nodes. This


                            automatic load balancing and also improves
                            fail over time, since PE


                            can be transfered to multiple nodes in parallel.

                            I have a prototype implementation here

                            Instructions to build it and try it out are
                            in the Readme.

                            More info on Helix can be found here,


                            Helix can provide lot of other
                            functionalities like

                                 1. Configure the topology according to
                            use case. For example,


                                 the partitions of different streams to
                            allow efficient joins.

                    Configure the

                                 number of standby for each partition
                            based on the head room


                                 2. When new nodes are added, it can
                            throttle the data movement
                                 3. Comes with large set of admin tools
                            like enable/disable node,
                                 dynamically change the topology etc.
                            Provides a rest interface to


                                 the cluster.
                                 4. Allows one to schedule custom tasks
                            like snapshot the PE's in a
                                 partition and restore from the snapshot.

                            Would like to get your feedback.

                            Kishore G

