So, after some discussion, I could describe another approach on how to build consistent partition on the fly.
1. We make a checkpoint, fix the size of the partition in OffheapManager. 2. After checkpoint finish, we start sending partition file (without any lock) to the receiver from 0 to fixed size. 3. Next checkpoints if they detect that they would override some pages of transferring file should write the previous state of a page to a dedicated file. So, we would have a list of pages written 1 by 1, page id is written in the page itself so we could determine page index. Let's name it log. 4. When transfer finished checkpointer would stop updating log-file. Now we are ready to send it to the receiver. 5. On receiver side we start merging the dirty partition file with log (updating it with pages from log-file). So, an advantage of this method: - checkpoint-thread work couldn't increase more than twice; - checkpoint-threads shouldn't wait for anything; - in best case, we receive partition without any extra effort. On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev < eduard.shangar...@gmail.com> wrote: > Maxim, > > I have looked through your algorithm of reading partition consistently. > And I have some questions/comments. > > 1. The algorithm requires heavy synchronization between checkpoint-thread > and new-approach-rebalance-threads, > because you need strong guarantees to not start writing or reading to > chunk which was updated or started reading by the counterpart. > > 2. Also, if we have started transferring this chunk in original partition > couldn't be updated by checkpoint-threads. They should wait for transfer > finishing. > > 3. If sending is slow and partition is updated then in worst case > checkpoint-threads would create the whole copy of the partition. > > So, what we have: > -on every page write checkpoint-thread should synchronize with > new-approach-rebalance-threads; > -checkpoint-thread should do extra-work, sometimes this could be as huge > as copying the whole partition. > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <ilya.kasnach...@gmail.com> > wrote: > >> Hello! >> >> This proposal will also happily break my compression-with-dictionary patch >> since it relies currently on only having local dictionaries. >> >> However, when you have compressed data, maybe speed boost is even greater >> with your approach. >> >> Regards, >> -- >> Ilya Kasnacheev >> >> >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <maxmu...@gmail.com>: >> >> > Igniters, >> > >> > >> > I'd like to take the next step of increasing the Apache Ignite with >> > enabled persistence rebalance speed. Currently, the rebalancing >> > procedure doesn't utilize the network and storage device throughout to >> > its full extent even with enough meaningful values of >> > rebalanceThreadPoolSize property. As part of the previous discussion >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an >> > idea [3] of transferring cache partition files over the network. >> > From my point, the case to which this type of rebalancing procedure >> > can bring the most benefit – is adding a completely new node or set of >> > new nodes to the cluster. Such a scenario implies fully relocation of >> > cache partition files to the new node. To roughly estimate the >> > superiority of partition file transmitting over the network the native >> > Linux scp\rsync commands can be used. My test environment showed the >> > result of the new approach as 270 MB/s vs the current 40 MB/s >> > single-threaded rebalance speed. >> > >> > >> > I've prepared the design document IEP-28 [4] and accumulated all the >> > process details of a new rebalance approach on that page. Below you >> > can find the most significant details of the new rebalance procedure >> > and components of the Apache Ignite which are proposed to change. >> > >> > Any feedback is very appreciated. >> > >> > >> > *PROCESS OVERVIEW* >> > >> > The whole process is described in terms of rebalancing single cache >> > group and partition files would be rebalanced one-by-one: >> > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the >> > supplier node; >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and >> > starts the new checkpoint process; >> > 3. The supplier node creates empty the temporary cache partition file >> > with .tmp postfix in the same cache persistence directory; >> > 4. The supplier node splits the whole cache partition file into >> > virtual chunks of predefined size (multiply to the PageMemory size); >> > 4.1. If the concurrent checkpoint thread determines the appropriate >> > cache partition file chunk and tries to flush dirty page to the cache >> > partition file >> > 4.1.1. If rebalance chunk already transferred >> > 4.1.1.1. Flush the dirty page to the file; >> > 4.1.2. If rebalance chunk not transferred >> > 4.1.2.1. Write this chunk to the temporary cache partition file; >> > 4.1.2.2. Flush the dirty page to the file; >> > 4.2. The node starts sending to the demander node each cache partition >> > file chunk one by one using FileChannel#transferTo >> > 4.2.1. If the current chunk was modified by checkpoint thread – read >> > it from the temporary cache partition file; >> > 4.2.2. If the current chunk is not touched – read it from the original >> > cache partition file; >> > 5. The demander node starts to listen to new pipe incoming connections >> > from the supplier node on TcpCommunicationSpi; >> > 6. The demander node creates the temporary cache partition file with >> > .tmp postfix in the same cache persistence directory; >> > 7. The demander node receives each cache partition file chunk one by one >> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk; >> > 7.2. The node flushes the downloaded chunk at the appropriate cache >> > partition file position; >> > 8. When the demander node receives the whole cache partition file >> > 8.1. The node initializes received .tmp file as its appropriate cache >> > partition file; >> > 8.2. Thread-per-partition begins to apply for data entries from the >> > beginning of WAL-temporary storage; >> > 8.3. All async operations corresponding to this partition file still >> > write to the end of temporary WAL; >> > 8.4. At the moment of WAL-temporary storage is ready to be empty >> > 8.4.1. Start the first checkpoint; >> > 8.4.2. Wait for the first checkpoint ends and own the cache partition; >> > 8.4.3. All operations now are switched to the partition file instead >> > of writing to the temporary WAL; >> > 8.4.4. Schedule the temporary WAL storage deletion; >> > 9. The supplier node deletes the temporary cache partition file; >> > >> > >> > *COMPONENTS TO CHANGE* >> > >> > CommunicationSpi >> > >> > To benefit from zero copy we must delegate the file transferring to >> > FileChannel#transferTo(long, long, >> > java.nio.channels.WritableByteChannel) because the fast path of >> > transferTo method is only executed if the destination buffer inherits >> > from an internal JDK class. >> > >> > Preloader >> > >> > A new implementation of cache entries preloader assume to be done. The >> > new implementation must send and receive cache partition files over >> > the CommunicationSpi channels by chunks of data with validation >> > received items. The new layer over the cache partition file must >> > support direct using of FileChannel#transferTo method over the >> > CommunicationSpi pipe connection. The connection bandwidth of the >> > cache partition file transfer must have the ability to be limited at >> > runtime. >> > >> > Checkpointer >> > >> > When the supplier node receives the cache partition file demand >> > request it will send the file over the CommunicationSpi. The cache >> > partition file can be concurrently updated by checkpoint thread during >> > its transmission. To guarantee the file consistency Сheckpointer must >> > use copy-on-write technique and save a copy of updated chunk into the >> > temporary file. >> > >> > (new) Catch-up temporary WAL >> > >> > While the demander node is in the partition file transmission state it >> > must save all cache entries corresponding to the moving partition into >> > a new temporary WAL storage. These entries will be applied later one >> > by one on the received cache partition file. All asynchronous >> > operations will be enrolled to the end of temporary WAL storage during >> > storage reads until it becomes fully read. The file-based FIFO >> > approach assumes to be used by this process. >> > >> > >> > *RECOVERY* >> > >> > In case of crash recovery, there is no additional actions need to be >> > applied to keep the cache partition file consistency. We are not >> > recovering partition with the moving state, thus the single partition >> > file will be lost and only it. The uniqueness of it is guaranteed by >> > the single-file-transmission process. The cache partition file will be >> > fully loaded on the next rebalance procedure. >> > >> > To provide default cluster recovery guarantee we must to: >> > 1. Start the checkpoint process when the temporary WAL storage becomes >> > empty; >> > 2. Wait for the first checkpoint ends and set owning status to the >> > cache partition; >> > >> > >> > >> > >> > [1] >> > >> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html >> > [2] >> > >> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020 >> > [4] >> > >> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing >> > >> >