Eduard, Thank you very much for the discussion!
Your algorithm looks much better for me too and easier to implement. I'll update appropriate process points on IEP page of the proposed rebalance procedure. On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev <eduard.shangar...@gmail.com> wrote: > > So, after some discussion, I could describe another approach on how to > build consistent partition on the fly. > > 1. We make a checkpoint, fix the size of the partition in OffheapManager. > 2. After checkpoint finish, we start sending partition file (without any > lock) to the receiver from 0 to fixed size. > 3. Next checkpoints if they detect that they would override some pages of > transferring file should write the previous state of a page to a dedicated > file. > So, we would have a list of pages written 1 by 1, page id is written in the > page itself so we could determine page index. Let's name it log. > 4. When transfer finished checkpointer would stop updating log-file. Now we > are ready to send it to the receiver. > 5. On receiver side we start merging the dirty partition file with log > (updating it with pages from log-file). > > So, an advantage of this method: > - checkpoint-thread work couldn't increase more than twice; > - checkpoint-threads shouldn't wait for anything; > - in best case, we receive partition without any extra effort. > > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev < > eduard.shangar...@gmail.com> wrote: > > > Maxim, > > > > I have looked through your algorithm of reading partition consistently. > > And I have some questions/comments. > > > > 1. The algorithm requires heavy synchronization between checkpoint-thread > > and new-approach-rebalance-threads, > > because you need strong guarantees to not start writing or reading to > > chunk which was updated or started reading by the counterpart. > > > > 2. Also, if we have started transferring this chunk in original partition > > couldn't be updated by checkpoint-threads. They should wait for transfer > > finishing. > > > > 3. If sending is slow and partition is updated then in worst case > > checkpoint-threads would create the whole copy of the partition. > > > > So, what we have: > > -on every page write checkpoint-thread should synchronize with > > new-approach-rebalance-threads; > > -checkpoint-thread should do extra-work, sometimes this could be as huge > > as copying the whole partition. > > > > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <ilya.kasnach...@gmail.com> > > wrote: > > > >> Hello! > >> > >> This proposal will also happily break my compression-with-dictionary patch > >> since it relies currently on only having local dictionaries. > >> > >> However, when you have compressed data, maybe speed boost is even greater > >> with your approach. > >> > >> Regards, > >> -- > >> Ilya Kasnacheev > >> > >> > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <maxmu...@gmail.com>: > >> > >> > Igniters, > >> > > >> > > >> > I'd like to take the next step of increasing the Apache Ignite with > >> > enabled persistence rebalance speed. Currently, the rebalancing > >> > procedure doesn't utilize the network and storage device throughout to > >> > its full extent even with enough meaningful values of > >> > rebalanceThreadPoolSize property. As part of the previous discussion > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed an > >> > idea [3] of transferring cache partition files over the network. > >> > From my point, the case to which this type of rebalancing procedure > >> > can bring the most benefit – is adding a completely new node or set of > >> > new nodes to the cluster. Such a scenario implies fully relocation of > >> > cache partition files to the new node. To roughly estimate the > >> > superiority of partition file transmitting over the network the native > >> > Linux scp\rsync commands can be used. My test environment showed the > >> > result of the new approach as 270 MB/s vs the current 40 MB/s > >> > single-threaded rebalance speed. > >> > > >> > > >> > I've prepared the design document IEP-28 [4] and accumulated all the > >> > process details of a new rebalance approach on that page. Below you > >> > can find the most significant details of the new rebalance procedure > >> > and components of the Apache Ignite which are proposed to change. > >> > > >> > Any feedback is very appreciated. > >> > > >> > > >> > *PROCESS OVERVIEW* > >> > > >> > The whole process is described in terms of rebalancing single cache > >> > group and partition files would be rebalanced one-by-one: > >> > > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to the > >> > supplier node; > >> > 2. When the supplier node receives GridDhtPartitionDemandMessage and > >> > starts the new checkpoint process; > >> > 3. The supplier node creates empty the temporary cache partition file > >> > with .tmp postfix in the same cache persistence directory; > >> > 4. The supplier node splits the whole cache partition file into > >> > virtual chunks of predefined size (multiply to the PageMemory size); > >> > 4.1. If the concurrent checkpoint thread determines the appropriate > >> > cache partition file chunk and tries to flush dirty page to the cache > >> > partition file > >> > 4.1.1. If rebalance chunk already transferred > >> > 4.1.1.1. Flush the dirty page to the file; > >> > 4.1.2. If rebalance chunk not transferred > >> > 4.1.2.1. Write this chunk to the temporary cache partition file; > >> > 4.1.2.2. Flush the dirty page to the file; > >> > 4.2. The node starts sending to the demander node each cache partition > >> > file chunk one by one using FileChannel#transferTo > >> > 4.2.1. If the current chunk was modified by checkpoint thread – read > >> > it from the temporary cache partition file; > >> > 4.2.2. If the current chunk is not touched – read it from the original > >> > cache partition file; > >> > 5. The demander node starts to listen to new pipe incoming connections > >> > from the supplier node on TcpCommunicationSpi; > >> > 6. The demander node creates the temporary cache partition file with > >> > .tmp postfix in the same cache persistence directory; > >> > 7. The demander node receives each cache partition file chunk one by one > >> > 7.1. The node checks CRC for each PageMemory in the downloaded chunk; > >> > 7.2. The node flushes the downloaded chunk at the appropriate cache > >> > partition file position; > >> > 8. When the demander node receives the whole cache partition file > >> > 8.1. The node initializes received .tmp file as its appropriate cache > >> > partition file; > >> > 8.2. Thread-per-partition begins to apply for data entries from the > >> > beginning of WAL-temporary storage; > >> > 8.3. All async operations corresponding to this partition file still > >> > write to the end of temporary WAL; > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty > >> > 8.4.1. Start the first checkpoint; > >> > 8.4.2. Wait for the first checkpoint ends and own the cache partition; > >> > 8.4.3. All operations now are switched to the partition file instead > >> > of writing to the temporary WAL; > >> > 8.4.4. Schedule the temporary WAL storage deletion; > >> > 9. The supplier node deletes the temporary cache partition file; > >> > > >> > > >> > *COMPONENTS TO CHANGE* > >> > > >> > CommunicationSpi > >> > > >> > To benefit from zero copy we must delegate the file transferring to > >> > FileChannel#transferTo(long, long, > >> > java.nio.channels.WritableByteChannel) because the fast path of > >> > transferTo method is only executed if the destination buffer inherits > >> > from an internal JDK class. > >> > > >> > Preloader > >> > > >> > A new implementation of cache entries preloader assume to be done. The > >> > new implementation must send and receive cache partition files over > >> > the CommunicationSpi channels by chunks of data with validation > >> > received items. The new layer over the cache partition file must > >> > support direct using of FileChannel#transferTo method over the > >> > CommunicationSpi pipe connection. The connection bandwidth of the > >> > cache partition file transfer must have the ability to be limited at > >> > runtime. > >> > > >> > Checkpointer > >> > > >> > When the supplier node receives the cache partition file demand > >> > request it will send the file over the CommunicationSpi. The cache > >> > partition file can be concurrently updated by checkpoint thread during > >> > its transmission. To guarantee the file consistency Сheckpointer must > >> > use copy-on-write technique and save a copy of updated chunk into the > >> > temporary file. > >> > > >> > (new) Catch-up temporary WAL > >> > > >> > While the demander node is in the partition file transmission state it > >> > must save all cache entries corresponding to the moving partition into > >> > a new temporary WAL storage. These entries will be applied later one > >> > by one on the received cache partition file. All asynchronous > >> > operations will be enrolled to the end of temporary WAL storage during > >> > storage reads until it becomes fully read. The file-based FIFO > >> > approach assumes to be used by this process. > >> > > >> > > >> > *RECOVERY* > >> > > >> > In case of crash recovery, there is no additional actions need to be > >> > applied to keep the cache partition file consistency. We are not > >> > recovering partition with the moving state, thus the single partition > >> > file will be lost and only it. The uniqueness of it is guaranteed by > >> > the single-file-transmission process. The cache partition file will be > >> > fully loaded on the next rebalance procedure. > >> > > >> > To provide default cluster recovery guarantee we must to: > >> > 1. Start the checkpoint process when the temporary WAL storage becomes > >> > empty; > >> > 2. Wait for the first checkpoint ends and set owning status to the > >> > cache partition; > >> > > >> > > >> > > >> > > >> > [1] > >> > > >> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html > >> > [2] > >> > > >> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020 > >> > [4] > >> > > >> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing > >> > > >> > >