
I'd like to take the next step of increasing the Apache Ignite with
enabled persistence rebalance speed. Currently, the rebalancing
procedure doesn't utilize the network and storage device throughout to
its full extent even with enough meaningful values of
rebalanceThreadPoolSize property. As part of the previous discussion
`How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
idea [3] of transferring cache partition files over the network.
>From my point, the case to which this type of rebalancing procedure
can bring the most benefit – is adding a completely new node or set of
new nodes to the cluster. Such a scenario implies fully relocation of
cache partition files to the new node. To roughly estimate the
superiority of partition file transmitting over the network the native
Linux scp\rsync commands can be used. My test environment showed the
result of the new approach as 270 MB/s vs the current 40 MB/s
single-threaded rebalance speed.

I've prepared the design document IEP-28 [4] and accumulated all the
process details of a new rebalance approach on that page. Below you
can find the most significant details of the new rebalance procedure
and components of the Apache Ignite which are proposed to change.

Any feedback is very appreciated.


The whole process is described in terms of rebalancing single cache
group and partition files would be rebalanced one-by-one:

1. The demander node sends the GridDhtPartitionDemandMessage to the
supplier node;
2. When the supplier node receives GridDhtPartitionDemandMessage and
starts the new checkpoint process;
3. The supplier node creates empty the temporary cache partition file
with .tmp postfix in the same cache persistence directory;
4. The supplier node splits the whole cache partition file into
virtual chunks of predefined size (multiply to the PageMemory size);
4.1. If the concurrent checkpoint thread determines the appropriate
cache partition file chunk and tries to flush dirty page to the cache
partition file
4.1.1. If rebalance chunk already transferred Flush the dirty page to the file;
4.1.2. If rebalance chunk not transferred Write this chunk to the temporary cache partition file; Flush the dirty page to the file;
4.2. The node starts sending to the demander node each cache partition
file chunk one by one using FileChannel#transferTo
4.2.1. If the current chunk was modified by checkpoint thread – read
it from the temporary cache partition file;
4.2.2. If the current chunk is not touched – read it from the original
cache partition file;
5. The demander node starts to listen to new pipe incoming connections
from the supplier node on TcpCommunicationSpi;
6. The demander node creates the temporary cache partition file with
.tmp postfix in the same cache persistence directory;
7. The demander node receives each cache partition file chunk one by one
7.1. The node checks CRC for each PageMemory in the downloaded chunk;
7.2. The node flushes the downloaded chunk at the appropriate cache
partition file position;
8. When the demander node receives the whole cache partition file
8.1. The node initializes received .tmp file as its appropriate cache
partition file;
8.2. Thread-per-partition begins to apply for data entries from the
beginning of WAL-temporary storage;
8.3. All async operations corresponding to this partition file still
write to the end of temporary WAL;
8.4. At the moment of WAL-temporary storage is ready to be empty
8.4.1. Start the first checkpoint;
8.4.2. Wait for the first checkpoint ends and own the cache partition;
8.4.3. All operations now are switched to the partition file instead
of writing to the temporary WAL;
8.4.4. Schedule the temporary WAL storage deletion;
9. The supplier node deletes the temporary cache partition file;



To benefit from zero copy we must delegate the file transferring to
FileChannel#transferTo(long, long,
java.nio.channels.WritableByteChannel) because the fast path of
transferTo method is only executed if the destination buffer inherits
from an internal JDK class.


A new implementation of cache entries preloader assume to be done. The
new implementation must send and receive cache partition files over
the CommunicationSpi channels by chunks of data with validation
received items. The new layer over the cache partition file must
support direct using of FileChannel#transferTo method over the
CommunicationSpi pipe connection. The connection bandwidth of the
cache partition file transfer must have the ability to be limited at


When the supplier node receives the cache partition file demand
request it will send the file over the CommunicationSpi. The cache
partition file can be concurrently updated by checkpoint thread during
its transmission. To guarantee the file consistency Сheckpointer must
use copy-on-write technique and save a copy of updated chunk into the
temporary file.

(new) Catch-up temporary WAL

While the demander node is in the partition file transmission state it
must save all cache entries corresponding to the moving partition into
a new temporary WAL storage. These entries will be applied later one
by one on the received cache partition file. All asynchronous
operations will be enrolled to the end of temporary WAL storage during
storage reads until it becomes fully read. The file-based FIFO
approach assumes to be used by this process.


In case of crash recovery, there is no additional actions need to be
applied to keep the cache partition file consistency. We are not
recovering partition with the moving state, thus the single partition
file will be lost and only it. The uniqueness of it is guaranteed by
the single-file-transmission process. The cache partition file will be
fully loaded on the next rebalance procedure.

To provide default cluster recovery guarantee we must to:
1. Start the checkpoint process when the temporary WAL storage becomes empty;
2. Wait for the first checkpoint ends and set owning status to the
cache partition;

[3] https://issues.apache.org/jira/browse/IGNITE-8020

Reply via email to