Vladimir,

> Another thing to consider is MVCC - under load partitions may have a lot of 
> entries which are not visible to anyone and is about to be removed, so there 
> is no need to transfer them at all.

Please share more details about how these entries occur. When they
become invisible? From what I've found, they appear after entry
deletion, right? It looks that when checkpoint ends these entries will
be gone, but I can mistake here. I'll try to take it into account and
will update the IEP page.


> Do we have any analysis on where time is really spent during current 
> rebalance implementation?

Yes, I've collected some statistics about the rebalancing procedure
and also I've tested it with different types of available rebalance
properties. The wiki page [1] of current rebalancing procedure
limitations and advantages of the new one was created by me. I have
not published yet everything measurements that I have, but, please,
look at the graph placed on that page. We have higher CPU consumption
on the demander node rather than on the supplier node. This is all
without any additional load. I think it shows us that saving entries
one by one is not the right strategy for the cache data balancing.
Therefore, I think we have some options here (you already mentioned
some of them): a batch entries processing, optimization internal data
structures, or avoid it at all by transferring stored files. We
already have tickets for the fuzzy free list implementation [2] and
the batch entries processing [3]. At that time in the past and now
these changes looks to me more complex and risky (maybe I'm missing
something and they are easier). I think it's acceptable to start (see
the next comment - why) the cluster rebalancing procedure optimization
from persistence enabled perspective by prototyping proposed approach.


> But it is rather complex, doesn't help in-memory mode, and will not work for 
> other storage formats which we may potentially have in future (e.g. 
> tablespace approach).

You are not actually right here. Yes, this proposal is only for
clusters with enabled persistence, but don't consider these changes as
a huge monolithic update. From my point, It's a set of independent
features that will give Apache Ignite a competitive advantage. For
instance, changes in Chekpointer will give us an opportunity to save
(over the network or a direct copy to some file) data snapshots of
persisted files under checkpoint at some point in time. Or another
example, changes in CommunicationSpi will allow us to have a channel
connection between any pair of nodes for any needs (e.g. copying any
files using zero-copy algorithm without node CPU resources consumption
or any binary data as well).
I've read your topic about remaining cache groups to the tablespace
and I very like this idea. I can say that the new type of storage
organization "file-segment-extent" will lead us to change only
Preloader implementation (or write another one, for each type of
storage organization), other parts of current proposal will work right
out of the box.


I think we can get a huge rebalance speed improvement on very fast
SSDs even more than with batched data processing on the demander side
or fuzzy free list implementation. I'll try to prototype the current
solution and recheck all measurements.

Please correct me where I am wrong.


[1] https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
[2] https://issues.apache.org/jira/browse/IGNITE-9520
[3] https://issues.apache.org/jira/browse/IGNITE-7935
On Fri, 23 Nov 2018 at 14:03, Vladimir Ozerov <voze...@gridgain.com> wrote:
>
> Maxim,
>
> Do we have any analysis on where time is really spent during current
> rebalance implementation? Proposed change may work very well. But it is
> rather complex, doesn't help in-memory mode, and will not work for other
> storage formats which we may potentially have in future (e.g. tablespace
> approach). Another thing to consider is MVCC - under load partitions may
> have a lot of entries which are not visible to anyone and is about to be
> removed, so there is no need to transfer them at all.
>
> Did we investigate any universal and less intrusive approaches to rebalance
> speedup before that? For example:
> - batched data block reads on supplier
> - iteration over partition rather than cache data tree on supplier
> - batched data save on demander
> - delayed free list and index re-build in demander
>
> Vladimir.
>
> On Fri, Nov 23, 2018 at 1:08 PM Maxim Muzafarov <maxmu...@gmail.com> wrote:
>
> > Igniters,
> >
> >
> > I'd like to take the next step of increasing the Apache Ignite with
> > enabled persistence rebalance speed. Currently, the rebalancing
> > procedure doesn't utilize the network and storage device throughout to
> > its full extent even with enough meaningful values of
> > rebalanceThreadPoolSize property. As part of the previous discussion
> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
> > idea [3] of transferring cache partition files over the network.
> > From my point, the case to which this type of rebalancing procedure
> > can bring the most benefit – is adding a completely new node or set of
> > new nodes to the cluster. Such a scenario implies fully relocation of
> > cache partition files to the new node. To roughly estimate the
> > superiority of partition file transmitting over the network the native
> > Linux scp\rsync commands can be used. My test environment showed the
> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > single-threaded rebalance speed.
> >
> >
> > I've prepared the design document IEP-28 [4] and accumulated all the
> > process details of a new rebalance approach on that page. Below you
> > can find the most significant details of the new rebalance procedure
> > and components of the Apache Ignite which are proposed to change.
> >
> > Any feedback is very appreciated.
> >
> >
> > *PROCESS OVERVIEW*
> >
> > The whole process is described in terms of rebalancing single cache
> > group and partition files would be rebalanced one-by-one:
> >
> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
> > supplier node;
> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
> > starts the new checkpoint process;
> > 3. The supplier node creates empty the temporary cache partition file
> > with .tmp postfix in the same cache persistence directory;
> > 4. The supplier node splits the whole cache partition file into
> > virtual chunks of predefined size (multiply to the PageMemory size);
> > 4.1. If the concurrent checkpoint thread determines the appropriate
> > cache partition file chunk and tries to flush dirty page to the cache
> > partition file
> > 4.1.1. If rebalance chunk already transferred
> > 4.1.1.1. Flush the dirty page to the file;
> > 4.1.2. If rebalance chunk not transferred
> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > 4.1.2.2. Flush the dirty page to the file;
> > 4.2. The node starts sending to the demander node each cache partition
> > file chunk one by one using FileChannel#transferTo
> > 4.2.1. If the current chunk was modified by checkpoint thread – read
> > it from the temporary cache partition file;
> > 4.2.2. If the current chunk is not touched – read it from the original
> > cache partition file;
> > 5. The demander node starts to listen to new pipe incoming connections
> > from the supplier node on TcpCommunicationSpi;
> > 6. The demander node creates the temporary cache partition file with
> > .tmp postfix in the same cache persistence directory;
> > 7. The demander node receives each cache partition file chunk one by one
> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
> > 7.2. The node flushes the downloaded chunk at the appropriate cache
> > partition file position;
> > 8. When the demander node receives the whole cache partition file
> > 8.1. The node initializes received .tmp file as its appropriate cache
> > partition file;
> > 8.2. Thread-per-partition begins to apply for data entries from the
> > beginning of WAL-temporary storage;
> > 8.3. All async operations corresponding to this partition file still
> > write to the end of temporary WAL;
> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > 8.4.1. Start the first checkpoint;
> > 8.4.2. Wait for the first checkpoint ends and own the cache partition;
> > 8.4.3. All operations now are switched to the partition file instead
> > of writing to the temporary WAL;
> > 8.4.4. Schedule the temporary WAL storage deletion;
> > 9. The supplier node deletes the temporary cache partition file;
> >
> >
> > *COMPONENTS TO CHANGE*
> >
> > CommunicationSpi
> >
> > To benefit from zero copy we must delegate the file transferring to
> > FileChannel#transferTo(long, long,
> > java.nio.channels.WritableByteChannel) because the fast path of
> > transferTo method is only executed if the destination buffer inherits
> > from an internal JDK class.
> >
> > Preloader
> >
> > A new implementation of cache entries preloader assume to be done. The
> > new implementation must send and receive cache partition files over
> > the CommunicationSpi channels by chunks of data with validation
> > received items. The new layer over the cache partition file must
> > support direct using of FileChannel#transferTo method over the
> > CommunicationSpi pipe connection. The connection bandwidth of the
> > cache partition file transfer must have the ability to be limited at
> > runtime.
> >
> > Checkpointer
> >
> > When the supplier node receives the cache partition file demand
> > request it will send the file over the CommunicationSpi. The cache
> > partition file can be concurrently updated by checkpoint thread during
> > its transmission. To guarantee the file consistency Сheckpointer must
> > use copy-on-write technique and save a copy of updated chunk into the
> > temporary file.
> >
> > (new) Catch-up temporary WAL
> >
> > While the demander node is in the partition file transmission state it
> > must save all cache entries corresponding to the moving partition into
> > a new temporary WAL storage. These entries will be applied later one
> > by one on the received cache partition file. All asynchronous
> > operations will be enrolled to the end of temporary WAL storage during
> > storage reads until it becomes fully read. The file-based FIFO
> > approach assumes to be used by this process.
> >
> >
> > *RECOVERY*
> >
> > In case of crash recovery, there is no additional actions need to be
> > applied to keep the cache partition file consistency. We are not
> > recovering partition with the moving state, thus the single partition
> > file will be lost and only it. The uniqueness of it is guaranteed by
> > the single-file-transmission process. The cache partition file will be
> > fully loaded on the next rebalance procedure.
> >
> > To provide default cluster recovery guarantee we must to:
> > 1. Start the checkpoint process when the temporary WAL storage becomes
> > empty;
> > 2. Wait for the first checkpoint ends and set owning status to the
> > cache partition;
> >
> >
> >
> >
> > [1]
> > http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > [2]
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > [4]
> > https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> >

Reply via email to