Hi all,

Currently I'm working on implementation of DiscoverySpi based on Apache
ZooKeeper (ZookeeperDiscoverySpi) and want to share results of this work.

In very large clusters (>1000 nodes) current default implementation of
DiscoverySpi - TcpDiscoverySpi - has some significant drawbacks:
- TcpDiscoverySpi organizes nodes in ring, and all messages are passed
sequentially via ring. More nodes there are in ring, more time it takes to
pass message. In very large clusters such architecture can cause slowdown
of important operations (node join, node stop, cache start, etc).
- in TcpDiscoverySpi's protocol each node in ring is able to fail next one
in case of network issues, and it is possible that two nodes can 'kill'
each other (it is possible in complex scenarios when network is broken and
then restored back after some time, such problems were observed in real
environments), and with current TcpDiscoverySpi protocol there is no easy
way to completely fix such problems.
- when some node in ring fails, then previous node tries to restore ring
and sequentially tries to connect to next nodes. If large part of ring
fails then it takes long time to sequentially detect failure of all nodes.
- with TcpDiscoverySpi split brain is possible (one ring can split into two
independent parts), separate mechanism is needed to protect from split
brain when TcpDiscoverySpi is used

Even though most probably some of these problems can be somehow fixed in
TcpDiscoverySpi, it seems more robust and fast DiscoverySpi can be
implemented on top of some existing coordination service.

Apache ZooKeeper is known reliable service and it provides all mechanisms
and consistency guarantees required for Ignite's DiscoverySpi. Some
technical details of ZookeeperDiscoverySpi implementation can be found in
description prepared by Sergey Puchnin:
https://cwiki.apache.org/confluence/display/IGNITE/Discovery+SPI+by+ZooKeeper
.

In our preliminary tests we were able to successfully start 4000+ nodes
with ZookeeperDiscoverySpi. New implementation works faster than
TcpDiscoverySpi and does not have mentioned TcpDiscoverySpi's drawbacks:
- nodes alive status is controlled by ZooKeeper, so nodes never can kill
each other
- ZooKeeper has protection from split brain
- with ZooKeeper it is possible to detect nodes join/failures in batches so
time to detect join/failure of 1 vs 100+ nodes is almost the same

I'm going to finalize implementation of ZookeeperDiscoverySpi in next few
days. But in Ignite there is one more issue caused by the fact that
DiscoverySpi and CommunicationSpi are two independent component: it is
possible that DiscoverySpi considers some node as alive, but at the same
time CommunicationSpi is not able to send message to this node (this issue
exists not only for ZookeeperDiscoverySpi for but for TcpDiscoverySpi too).
Such case is very critical since all internal Ignite's code assumes that if
node is alive then CommunicationSpi is able to send/receive messages
to/from this node. If it is not the case, then any Ignite operation can
hang (with ZooKeeper such situation is possible when due to network
failures nodes have connection with ZooKeeper, but can not establish TCP
connection to each other).

If such case arises, then Ignite should have some mechanism to forcibly
kill faulty nodes to keep cluster operational. But note that in case of
'split brain' scenarios each independent part of cluster will consider
others as 'failed' and there should be some way to choose which part should
be killed. It would be good to provide generic solution for this problem as
part of work on new DiscoverySpi.

We discussed this with Yakov Zhdanov and suggest following: in case when
communication fails to send message to some node and this node is
considered as alive, then Ignite should trigger global 'communication error
resolve' process (obviously, this process should use for messaging internal
discovery mechanisms). As part of this process CommunicationSpi on each
node should try to establish connection to all others alive nodes
(TcpCommunicationSpi can do this efficiently using async NIO) and send
results of this connection test to some coordinator node (e.g. oldest
cluster node). When coordinator receives results of connection test from
all nodes it calls user-defined CommunicationProblemResolver to choose
which nodes should be killed (CommunicationProblemResolver should be set in
IgniteConfiguration):

public interface CommunicationProblemResolver {
    public void resolve(CommunicationProblemContext ctx);
}

CommunicationProblemResolver  receives CommunicationProblemContext which
provides results of CommunicationSpi connection test. Also it can be useful
to have information about started caches and current cache data
distribution to decide which part of cluster should be killed:

public interface CommunicationProblemContext {
    /**
     * @return Current topology snapshot.
     */
    public List<ClusterNode> topologySnapshot();

    /**
     * @param node1 First node.
     * @param node2 Second node.
     * @return {@code True} if {@link CommunicationSpi} is able to
establish connection from first node to second node.
     */
    public boolean connectionAvailable(ClusterNode node1, ClusterNode
node2);

    /**
     * @return List of currently started cache.
     */
    public List<String> startedCaches();

    /**
     * @param cacheName Cache name.
     * @return Cache partitions affinity assignment.
     */
    public List<List<ClusterNode>> cacheAffinity(String cacheName);

    /**
     * @param cacheName Cache name.
     * @return Cache partitions owners.
     */
    public List<List<ClusterNode>> cachePartitionOwners(String cacheName);

    /**
     * @param node Node to kill after communication error resolve.
     */
    public void killNode(ClusterNode node);
}

Default implementation of CommunicationProblemContext provided as part of
Ignite can keep alive largest sub-cluster where all nodes are able to
connect to each other.

In addition to CommunicationProblemResolver we can fire new local
org.apache.ignite.events.Event when CommunicationSpi fails to send message
to alive node (can be useful for monitoring):

class CommunicationProblemEvent extends EventAdapter {
     ClusterNode eventNode();

     Exception connectionError();
}


Since this is pretty large change in public API I would be grateful if you
provide thoughts about CommunicationProblemResolver.

Thank you,
Semyon

Reply via email to