So, after some discussion, I could describe another approach on how to
build consistent partition on the fly.

1. We make a checkpoint, fix the size of the partition in OffheapManager.
2. After checkpoint finish, we start sending partition file (without any
lock) to the receiver from 0 to fixed size.
3. Next checkpoints if they detect that they would override some pages of
transferring file should write the previous state of a page to a dedicated
file.
So, we would have a list of pages written 1 by 1, page id is written in the
page itself so we could determine page index. Let's name it log.
4. When transfer finished checkpointer would stop updating log-file. Now we
are ready to send it to the receiver.
5. On receiver side we start merging the dirty partition file with log
(updating it with pages from log-file).

So, an advantage of this method:
- checkpoint-thread work couldn't  increase more than twice;
- checkpoint-threads shouldn't wait for anything;
- in best case, we receive partition without any extra effort.


On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
eduard.shangar...@gmail.com> wrote:

> Maxim,
>
> I have looked through your algorithm of reading partition consistently.
> And I have some questions/comments.
>
> 1. The algorithm requires heavy synchronization between checkpoint-thread
> and new-approach-rebalance-threads,
> because you need strong guarantees to not start writing or reading to
> chunk which was updated or started reading by the counterpart.
>
> 2. Also, if we have started transferring this chunk in original partition
> couldn't be updated by checkpoint-threads. They should wait for transfer
> finishing.
>
> 3. If sending is slow and partition is updated then in worst case
> checkpoint-threads would create the whole copy of the partition.
>
> So, what we have:
> -on every page write checkpoint-thread should synchronize with
> new-approach-rebalance-threads;
> -checkpoint-thread should do extra-work, sometimes this could be as huge
> as copying the whole partition.
>
>
> On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <ilya.kasnach...@gmail.com>
> wrote:
>
>> Hello!
>>
>> This proposal will also happily break my compression-with-dictionary patch
>> since it relies currently on only having local dictionaries.
>>
>> However, when you have compressed data, maybe speed boost is even greater
>> with your approach.
>>
>> Regards,
>> --
>> Ilya Kasnacheev
>>
>>
>> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <maxmu...@gmail.com>:
>>
>> > Igniters,
>> >
>> >
>> > I'd like to take the next step of increasing the Apache Ignite with
>> > enabled persistence rebalance speed. Currently, the rebalancing
>> > procedure doesn't utilize the network and storage device throughout to
>> > its full extent even with enough meaningful values of
>> > rebalanceThreadPoolSize property. As part of the previous discussion
>> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an
>> > idea [3] of transferring cache partition files over the network.
>> > From my point, the case to which this type of rebalancing procedure
>> > can bring the most benefit – is adding a completely new node or set of
>> > new nodes to the cluster. Such a scenario implies fully relocation of
>> > cache partition files to the new node. To roughly estimate the
>> > superiority of partition file transmitting over the network the native
>> > Linux scp\rsync commands can be used. My test environment showed the
>> > result of the new approach as 270 MB/s vs the current 40 MB/s
>> > single-threaded rebalance speed.
>> >
>> >
>> > I've prepared the design document IEP-28 [4] and accumulated all the
>> > process details of a new rebalance approach on that page. Below you
>> > can find the most significant details of the new rebalance procedure
>> > and components of the Apache Ignite which are proposed to change.
>> >
>> > Any feedback is very appreciated.
>> >
>> >
>> > *PROCESS OVERVIEW*
>> >
>> > The whole process is described in terms of rebalancing single cache
>> > group and partition files would be rebalanced one-by-one:
>> >
>> > 1. The demander node sends the GridDhtPartitionDemandMessage to the
>> > supplier node;
>> > 2. When the supplier node receives GridDhtPartitionDemandMessage and
>> > starts the new checkpoint process;
>> > 3. The supplier node creates empty the temporary cache partition file
>> > with .tmp postfix in the same cache persistence directory;
>> > 4. The supplier node splits the whole cache partition file into
>> > virtual chunks of predefined size (multiply to the PageMemory size);
>> > 4.1. If the concurrent checkpoint thread determines the appropriate
>> > cache partition file chunk and tries to flush dirty page to the cache
>> > partition file
>> > 4.1.1. If rebalance chunk already transferred
>> > 4.1.1.1. Flush the dirty page to the file;
>> > 4.1.2. If rebalance chunk not transferred
>> > 4.1.2.1. Write this chunk to the temporary cache partition file;
>> > 4.1.2.2. Flush the dirty page to the file;
>> > 4.2. The node starts sending to the demander node each cache partition
>> > file chunk one by one using FileChannel#transferTo
>> > 4.2.1. If the current chunk was modified by checkpoint thread – read
>> > it from the temporary cache partition file;
>> > 4.2.2. If the current chunk is not touched – read it from the original
>> > cache partition file;
>> > 5. The demander node starts to listen to new pipe incoming connections
>> > from the supplier node on TcpCommunicationSpi;
>> > 6. The demander node creates the temporary cache partition file with
>> > .tmp postfix in the same cache persistence directory;
>> > 7. The demander node receives each cache partition file chunk one by one
>> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk;
>> > 7.2. The node flushes the downloaded chunk at the appropriate cache
>> > partition file position;
>> > 8. When the demander node receives the whole cache partition file
>> > 8.1. The node initializes received .tmp file as its appropriate cache
>> > partition file;
>> > 8.2. Thread-per-partition begins to apply for data entries from the
>> > beginning of WAL-temporary storage;
>> > 8.3. All async operations corresponding to this partition file still
>> > write to the end of temporary WAL;
>> > 8.4. At the moment of WAL-temporary storage is ready to be empty
>> > 8.4.1. Start the first checkpoint;
>> > 8.4.2. Wait for the first checkpoint ends and own the cache partition;
>> > 8.4.3. All operations now are switched to the partition file instead
>> > of writing to the temporary WAL;
>> > 8.4.4. Schedule the temporary WAL storage deletion;
>> > 9. The supplier node deletes the temporary cache partition file;
>> >
>> >
>> > *COMPONENTS TO CHANGE*
>> >
>> > CommunicationSpi
>> >
>> > To benefit from zero copy we must delegate the file transferring to
>> > FileChannel#transferTo(long, long,
>> > java.nio.channels.WritableByteChannel) because the fast path of
>> > transferTo method is only executed if the destination buffer inherits
>> > from an internal JDK class.
>> >
>> > Preloader
>> >
>> > A new implementation of cache entries preloader assume to be done. The
>> > new implementation must send and receive cache partition files over
>> > the CommunicationSpi channels by chunks of data with validation
>> > received items. The new layer over the cache partition file must
>> > support direct using of FileChannel#transferTo method over the
>> > CommunicationSpi pipe connection. The connection bandwidth of the
>> > cache partition file transfer must have the ability to be limited at
>> > runtime.
>> >
>> > Checkpointer
>> >
>> > When the supplier node receives the cache partition file demand
>> > request it will send the file over the CommunicationSpi. The cache
>> > partition file can be concurrently updated by checkpoint thread during
>> > its transmission. To guarantee the file consistency Сheckpointer must
>> > use copy-on-write technique and save a copy of updated chunk into the
>> > temporary file.
>> >
>> > (new) Catch-up temporary WAL
>> >
>> > While the demander node is in the partition file transmission state it
>> > must save all cache entries corresponding to the moving partition into
>> > a new temporary WAL storage. These entries will be applied later one
>> > by one on the received cache partition file. All asynchronous
>> > operations will be enrolled to the end of temporary WAL storage during
>> > storage reads until it becomes fully read. The file-based FIFO
>> > approach assumes to be used by this process.
>> >
>> >
>> > *RECOVERY*
>> >
>> > In case of crash recovery, there is no additional actions need to be
>> > applied to keep the cache partition file consistency. We are not
>> > recovering partition with the moving state, thus the single partition
>> > file will be lost and only it. The uniqueness of it is guaranteed by
>> > the single-file-transmission process. The cache partition file will be
>> > fully loaded on the next rebalance procedure.
>> >
>> > To provide default cluster recovery guarantee we must to:
>> > 1. Start the checkpoint process when the temporary WAL storage becomes
>> > empty;
>> > 2. Wait for the first checkpoint ends and set owning status to the
>> > cache partition;
>> >
>> >
>> >
>> >
>> > [1]
>> >
>> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
>> > [2]
>> >
>> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
>> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
>> > [4]
>> >
>> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
>> >
>>
>

Reply via email to