Maxim,

Thank you for excellent analysis! From profiling data I see the following:
1) Almost no parallelism - one rebalance thread is used (default), two
responses are sent per a single demand request (default)
2) All system resources are underutilized - CPU, disk, network
3) Huge hotspot ion free lists

In general I would recommend to consider the following points during
further rebalance optimization:
1) Start with the fact that rebalance always causes system degradation due
to additional hardware resources required. Different deployments may
require different degradation modes. Sometimes "soft" mode is preferable -
long rebalance with low system overhead. This is what we see now. Sometimes
the opposite - as short rebalance as possible at the cost of severe
degradation in operations. Sometimes - something in the middle. Every
optimization we made should have clear explanation on how system degrades.
2) We need to investigate the hotspot on free lists. Looks like this is the
main limiting factor for now. Alex, do you have any ideas what is this? Is
it possible to bypass freelists completely during rebalance at the cost of
higher data fragmentation during concurrent updates?
3) We need to investigate streaming rebalance mode, when supplier
constantly streams data to demander similarly to our data streamer. It
should be fairly easy to implement, applicable for all modes and may
speedup rebalance up to 5-10 times. Great thing about this approach is that
it will allow users to choose between system stress level and rebalance
throughput easily.
4) File transfer rebalance: we need to have clear design of failure and
concurrency cases and degradation modes. Several questions to answer:
4.1) What would happen if another rebalance starts when previous is not
finished yet?
4.2) What would happen if supplier or demander fails in the middle? What
kind of cleanup would be required
4.3) Degradation: what kind of problems should users expect due to massive
disk and network load during file transfer and due to data merging on
demander side?
4.4) Degradation: how secondary indexes would be rebuilt on demander side?
Note that until indexes are ready node is not operational and cannot become
partition owner, and index rebuild is essentially full data rescan with
potentially the same issues with slow updates of persistent data structures
we have now.

Vladimir.

On Fri, Dec 7, 2018 at 3:32 PM Maxim Muzafarov <maxmu...@gmail.com> wrote:

> Vladimir,
>
>
> Let me propose to consider the whole this rebalance process as having
> three strategies:
> - The classical message-based approach, preferable to use for in-memory
> caches;
> - Historical rebalance based on WAL, used for rebalancing persisted
> caches deltas;
> - (new) File-based rebalance (current IEP-28), used for relocation of
> full cache partitions.
>
>
> First of all, I want to show you that for the full cache relocation
> file-based rebalancing strategy from my point has a set of advantages
> prior to the message-based approach. Let's also assume that the time
> spent on WAL logging during the rebalance procedure is already
> optimized (we are not taking it into account at all).
>
> According to preliminary measurements [8] and the message above we
> spend more than 65% of rebalancing time on creating K-V cache pair for
> preloading entries and supporting internal data structures. It is true
> as for in-memory cluster configuration and for a cluster with enabled
> persistence. It is also true, that these data structures can be used
> more efficiently by implementing batch entry processing for them. And
> it should be done (a ticket for it is already created [3]).
>
> Let's have a look closer to the simple example.
>
> I've collected some information about a cache on my stress-testing cluster:
> partitions (total): 65534
> single partition size: 437 MB
> rebalance batch: 512 Kb
> batches per partition: 874
> partitions per node: 606
> batches per node: 529644
>
> Let's assume that we've already implemented batched entry processing
> and we perform bulk operations over internal data structures.
> Regarding these assumptions, we still need to process 874 batches per
> each cache partition to transfer data. I will cost us up to 15 seconds
> per each partition file, a lot of CPU cycles to maintain internal data
> structures and block for a while other threads waiting for releasing
> database checkpoint lock.
>
> Increasing the rebalance batch size is not efficient here because we
> are starting to hold the database lock for too long. It will lead to
> thread starvation and will only slow down the whole rebalance speed.
> Exactly the same as increasing batch size, making the rebalance thread
> pool bigger can lead to the cluster performance drop for almost the
> same reasons.
>
> I think the file-based rebalance can provide us (prior to the batch
> processing) for huge caches:
>  - a fair non-blocking approach in each part of the rebalancing procedure;
>  - reduce the number of locks being acquired (the other threads can
> make bigger progress);
>  - a zero-copy transmission on supplier saves CPU cycles and memory
> bandwidth;
>  - as a result, the transferable batch size increased up to the whole
> partition file size;
>
> SUMMARY TO DO
>
> The plan to do and other ideas (without risks evaluation):
>
> Message-based approach.
> Optimization to do by priority [3] [2] and may be [9].
>
> Historical rebalance based on WAL.
> Suppose, nothing to do here as Sergey already working on the issue [1]
> with turning off WAL.
>
> (new) Full cache data relocation.
> Prototyping current IEP-28.
>
> I think another approach can be also implemented.
> During the rebalance procedure we can write entries to data pages
> directly skipping free lists, PK index and secondary index. Once the
> partition preloading is finished, we will rebuild free list and all
> indexes.
> Will it work for us?
>
> ANSWERS
>
> > 1) Is it correct that supplier sends only one message for every demand
> > message? If yes, then streaming should improve network utilization a lot.
>
> I think we already have such ability for the Apache Ignite (not
> exactly streaming). The CacheConfiguration#rebalanceBatchesPrefetchCnt
> can be used here to reduce the system delay between send\receive
> message process. The default value is more than enough for most of the
> cases. The testing results showed only 7 seconds (0.32%) delay during
> the 40 min cache rebalance procedure. So, each supply message is ready
> to be sent when the next demand message arrives.
>
>
> > 2) Is it correct that for user caches we process supply messages in a
> > system pool? Did we consider moving it to striped pool? Because if all
> > operations on a single partition is ordered, we may apply a number of
> > critical optimizations - bypassing page cache and checkpointer for new
> > entries, batched index updates, batched free list updates, etc.
>
> I think the rebalance procedure should not cause a thousand messages
> per second, so we don't need to move the rebalance procedure to the
> stripped pool. We should have a limited stable load for rebalancing
> procedure on the cluster. As for the second part, are you talking
> about thread per partition model? If yes, we have tickets for it [4],
> [5], [6].
>
> > 3) Seems that WAL should no longer be a problem for us [1]. What are
> exact
> > conditions when it could be disabled on supplier side?
>
> Do you mean the demander side? Why we should try to disable it on the
> supplier node? I do not take it into account at all because it can be
> easily done (suppose issue [1] is about it). But it doesn't help us
> much for the full cache relocations.
>
> > 4) Most important - have we tried to profile plain single-threaded
> > rebalance without concurrent write load? We need to have clear
> > understanding on where time is spent - supplier/demander,
> cpu/network/disk,
> > etc. Some Java tracing code should help.
>
> I've updated some information about profiling results on the
> confluence page [8]. If you will find that I've missed something or
> information is unclear, please, let me know and I will fix it.
>
> > And one question regarding proposed implementation - how are we going to
> > handle secondary indexes?
>
> Thank you for pointing this out. Actually, the current IEP page
> doesn't cover this case. I think we can schedule rebuild indexes after
> all partition files would be transferred. This approach was also
> mentioned at [2] issue.
> Will it be the correct?
>
>
> [1] https://issues.apache.org/jira/browse/IGNITE-10505
> [2] https://issues.apache.org/jira/browse/IGNITE-7934
> [3] https://issues.apache.org/jira/browse/IGNITE-7935
>
> [4] https://issues.apache.org/jira/browse/IGNITE-4682
> [5] https://issues.apache.org/jira/browse/IGNITE-4506
> [6] https://issues.apache.org/jira/browse/IGNITE-4680
>
> [7] https://issues.apache.org/jira/browse/IGNITE-7027
> [8]
> https://cwiki.apache.org/confluence/display/IGNITE/Rebalance+peer-2-peer
> [9] https://issues.apache.org/jira/browse/IGNITE-9520
> On Wed, 28 Nov 2018 at 23:00, Vladimir Ozerov <voze...@gridgain.com>
> wrote:
> >
> > Maxim,
> >
> > Regarding MVCC - this is essentially a copy-on-write approach. New entry
> is
> > created on every update. They are cleaned asynchronously by dedicated
> > threads (aka "vacuum").
> >
> > I looked at the document you mentioned, thank you for pointing to it. But
> > it doesn't answer all questions around existing design, and what I am
> > trying to do is to get how deep do we understand current problems. It is
> > very true that various subsystems, such as buffer managers, WALs,
> > supporting sctructures, etc. incur very serious overhead. And when it
> comes
> > to heavy operations implementors typically seek for a way to bypass as
> much
> > components as possible, taking in count that different shortcuts lead to
> > different types of side effects. And IMO our very important goal for now
> is
> > to create space of possible improvements, and estimate their costs, risks
> > and applicability for product's configuration space.
> >
> > Let me claridy several questions about current rebalance implementation,
> as
> > I am not a big expert here.
> > 1) Is it correct that supplier sends only one message for every demand
> > message? If yes, then streaming should improve network utilization a lot.
> > 2) Is it correct that for user caches we process supply messages in a
> > system pool? Did we consider moving it to striped pool? Because if all
> > operations on a single partition is ordered, we may apply a number of
> > critical optimizations - bypassing page cache and checkpointer for new
> > entries, batched index updates, batched free list updates, etc.
> > 3) Seems that WAL should no longer be a problem for us [1]. What are
> exact
> > conditions when it could be disabled on supplier side?
> > 4) Most important - have we tried to profile plain single-threaded
> > rebalance without concurrent write load? We need to have clear
> > understanding on where time is spent - supplier/demander,
> cpu/network/disk,
> > etc. Some Java tracing code should help.
> >
> > And one question regarding proposed implementation - how are we going to
> > handle secondary indexes?
> >
> > [1] https://issues.apache.org/jira/browse/IGNITE-8017
> >
> >
> > On Wed, Nov 28, 2018 at 6:43 PM Maxim Muzafarov <maxmu...@gmail.com>
> wrote:
> >
> > > Eduard,
> > >
> > > Thank you very much for the discussion!
> > >
> > > Your algorithm looks much better for me too and easier to implement.
> > > I'll update appropriate process points on IEP page of the proposed
> > > rebalance procedure.
> > > On Tue, 27 Nov 2018 at 18:52, Eduard Shangareev
> > > <eduard.shangar...@gmail.com> wrote:
> > > >
> > > > So, after some discussion, I could describe another approach on how
> to
> > > > build consistent partition on the fly.
> > > >
> > > > 1. We make a checkpoint, fix the size of the partition in
> OffheapManager.
> > > > 2. After checkpoint finish, we start sending partition file (without
> any
> > > > lock) to the receiver from 0 to fixed size.
> > > > 3. Next checkpoints if they detect that they would override some
> pages of
> > > > transferring file should write the previous state of a page to a
> > > dedicated
> > > > file.
> > > > So, we would have a list of pages written 1 by 1, page id is written
> in
> > > the
> > > > page itself so we could determine page index. Let's name it log.
> > > > 4. When transfer finished checkpointer would stop updating log-file.
> Now
> > > we
> > > > are ready to send it to the receiver.
> > > > 5. On receiver side we start merging the dirty partition file with
> log
> > > > (updating it with pages from log-file).
> > > >
> > > > So, an advantage of this method:
> > > > - checkpoint-thread work couldn't  increase more than twice;
> > > > - checkpoint-threads shouldn't wait for anything;
> > > > - in best case, we receive partition without any extra effort.
> > > >
> > > >
> > > > On Mon, Nov 26, 2018 at 8:54 PM Eduard Shangareev <
> > > > eduard.shangar...@gmail.com> wrote:
> > > >
> > > > > Maxim,
> > > > >
> > > > > I have looked through your algorithm of reading partition
> consistently.
> > > > > And I have some questions/comments.
> > > > >
> > > > > 1. The algorithm requires heavy synchronization between
> > > checkpoint-thread
> > > > > and new-approach-rebalance-threads,
> > > > > because you need strong guarantees to not start writing or reading
> to
> > > > > chunk which was updated or started reading by the counterpart.
> > > > >
> > > > > 2. Also, if we have started transferring this chunk in original
> > > partition
> > > > > couldn't be updated by checkpoint-threads. They should wait for
> > > transfer
> > > > > finishing.
> > > > >
> > > > > 3. If sending is slow and partition is updated then in worst case
> > > > > checkpoint-threads would create the whole copy of the partition.
> > > > >
> > > > > So, what we have:
> > > > > -on every page write checkpoint-thread should synchronize with
> > > > > new-approach-rebalance-threads;
> > > > > -checkpoint-thread should do extra-work, sometimes this could be as
> > > huge
> > > > > as copying the whole partition.
> > > > >
> > > > >
> > > > > On Fri, Nov 23, 2018 at 2:55 PM Ilya Kasnacheev <
> > > ilya.kasnach...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hello!
> > > > >>
> > > > >> This proposal will also happily break my
> compression-with-dictionary
> > > patch
> > > > >> since it relies currently on only having local dictionaries.
> > > > >>
> > > > >> However, when you have compressed data, maybe speed boost is even
> > > greater
> > > > >> with your approach.
> > > > >>
> > > > >> Regards,
> > > > >> --
> > > > >> Ilya Kasnacheev
> > > > >>
> > > > >>
> > > > >> пт, 23 нояб. 2018 г. в 13:08, Maxim Muzafarov <maxmu...@gmail.com
> >:
> > > > >>
> > > > >> > Igniters,
> > > > >> >
> > > > >> >
> > > > >> > I'd like to take the next step of increasing the Apache Ignite
> with
> > > > >> > enabled persistence rebalance speed. Currently, the rebalancing
> > > > >> > procedure doesn't utilize the network and storage device
> throughout
> > > to
> > > > >> > its full extent even with enough meaningful values of
> > > > >> > rebalanceThreadPoolSize property. As part of the previous
> discussion
> > > > >> > `How to make rebalance faster` [1] and IEP-16 [2] Ilya proposed
> an
> > > > >> > idea [3] of transferring cache partition files over the network.
> > > > >> > From my point, the case to which this type of rebalancing
> procedure
> > > > >> > can bring the most benefit – is adding a completely new node or
> set
> > > of
> > > > >> > new nodes to the cluster. Such a scenario implies fully
> relocation
> > > of
> > > > >> > cache partition files to the new node. To roughly estimate the
> > > > >> > superiority of partition file transmitting over the network the
> > > native
> > > > >> > Linux scp\rsync commands can be used. My test environment
> showed the
> > > > >> > result of the new approach as 270 MB/s vs the current 40 MB/s
> > > > >> > single-threaded rebalance speed.
> > > > >> >
> > > > >> >
> > > > >> > I've prepared the design document IEP-28 [4] and accumulated
> all the
> > > > >> > process details of a new rebalance approach on that page. Below
> you
> > > > >> > can find the most significant details of the new rebalance
> procedure
> > > > >> > and components of the Apache Ignite which are proposed to
> change.
> > > > >> >
> > > > >> > Any feedback is very appreciated.
> > > > >> >
> > > > >> >
> > > > >> > *PROCESS OVERVIEW*
> > > > >> >
> > > > >> > The whole process is described in terms of rebalancing single
> cache
> > > > >> > group and partition files would be rebalanced one-by-one:
> > > > >> >
> > > > >> > 1. The demander node sends the GridDhtPartitionDemandMessage to
> the
> > > > >> > supplier node;
> > > > >> > 2. When the supplier node receives
> GridDhtPartitionDemandMessage and
> > > > >> > starts the new checkpoint process;
> > > > >> > 3. The supplier node creates empty the temporary cache partition
> > > file
> > > > >> > with .tmp postfix in the same cache persistence directory;
> > > > >> > 4. The supplier node splits the whole cache partition file into
> > > > >> > virtual chunks of predefined size (multiply to the PageMemory
> size);
> > > > >> > 4.1. If the concurrent checkpoint thread determines the
> appropriate
> > > > >> > cache partition file chunk and tries to flush dirty page to the
> > > cache
> > > > >> > partition file
> > > > >> > 4.1.1. If rebalance chunk already transferred
> > > > >> > 4.1.1.1. Flush the dirty page to the file;
> > > > >> > 4.1.2. If rebalance chunk not transferred
> > > > >> > 4.1.2.1. Write this chunk to the temporary cache partition file;
> > > > >> > 4.1.2.2. Flush the dirty page to the file;
> > > > >> > 4.2. The node starts sending to the demander node each cache
> > > partition
> > > > >> > file chunk one by one using FileChannel#transferTo
> > > > >> > 4.2.1. If the current chunk was modified by checkpoint thread –
> read
> > > > >> > it from the temporary cache partition file;
> > > > >> > 4.2.2. If the current chunk is not touched – read it from the
> > > original
> > > > >> > cache partition file;
> > > > >> > 5. The demander node starts to listen to new pipe incoming
> > > connections
> > > > >> > from the supplier node on TcpCommunicationSpi;
> > > > >> > 6. The demander node creates the temporary cache partition file
> with
> > > > >> > .tmp postfix in the same cache persistence directory;
> > > > >> > 7. The demander node receives each cache partition file chunk
> one
> > > by one
> > > > >> > 7.1. The node checks CRC for each PageMemory in the downloaded
> > > chunk;
> > > > >> > 7.2. The node flushes the downloaded chunk at the appropriate
> cache
> > > > >> > partition file position;
> > > > >> > 8. When the demander node receives the whole cache partition
> file
> > > > >> > 8.1. The node initializes received .tmp file as its appropriate
> > > cache
> > > > >> > partition file;
> > > > >> > 8.2. Thread-per-partition begins to apply for data entries from
> the
> > > > >> > beginning of WAL-temporary storage;
> > > > >> > 8.3. All async operations corresponding to this partition file
> still
> > > > >> > write to the end of temporary WAL;
> > > > >> > 8.4. At the moment of WAL-temporary storage is ready to be empty
> > > > >> > 8.4.1. Start the first checkpoint;
> > > > >> > 8.4.2. Wait for the first checkpoint ends and own the cache
> > > partition;
> > > > >> > 8.4.3. All operations now are switched to the partition file
> instead
> > > > >> > of writing to the temporary WAL;
> > > > >> > 8.4.4. Schedule the temporary WAL storage deletion;
> > > > >> > 9. The supplier node deletes the temporary cache partition file;
> > > > >> >
> > > > >> >
> > > > >> > *COMPONENTS TO CHANGE*
> > > > >> >
> > > > >> > CommunicationSpi
> > > > >> >
> > > > >> > To benefit from zero copy we must delegate the file
> transferring to
> > > > >> > FileChannel#transferTo(long, long,
> > > > >> > java.nio.channels.WritableByteChannel) because the fast path of
> > > > >> > transferTo method is only executed if the destination buffer
> > > inherits
> > > > >> > from an internal JDK class.
> > > > >> >
> > > > >> > Preloader
> > > > >> >
> > > > >> > A new implementation of cache entries preloader assume to be
> done.
> > > The
> > > > >> > new implementation must send and receive cache partition files
> over
> > > > >> > the CommunicationSpi channels by chunks of data with validation
> > > > >> > received items. The new layer over the cache partition file must
> > > > >> > support direct using of FileChannel#transferTo method over the
> > > > >> > CommunicationSpi pipe connection. The connection bandwidth of
> the
> > > > >> > cache partition file transfer must have the ability to be
> limited at
> > > > >> > runtime.
> > > > >> >
> > > > >> > Checkpointer
> > > > >> >
> > > > >> > When the supplier node receives the cache partition file demand
> > > > >> > request it will send the file over the CommunicationSpi. The
> cache
> > > > >> > partition file can be concurrently updated by checkpoint thread
> > > during
> > > > >> > its transmission. To guarantee the file consistency Сheckpointer
> > > must
> > > > >> > use copy-on-write technique and save a copy of updated chunk
> into
> > > the
> > > > >> > temporary file.
> > > > >> >
> > > > >> > (new) Catch-up temporary WAL
> > > > >> >
> > > > >> > While the demander node is in the partition file transmission
> state
> > > it
> > > > >> > must save all cache entries corresponding to the moving
> partition
> > > into
> > > > >> > a new temporary WAL storage. These entries will be applied
> later one
> > > > >> > by one on the received cache partition file. All asynchronous
> > > > >> > operations will be enrolled to the end of temporary WAL storage
> > > during
> > > > >> > storage reads until it becomes fully read. The file-based FIFO
> > > > >> > approach assumes to be used by this process.
> > > > >> >
> > > > >> >
> > > > >> > *RECOVERY*
> > > > >> >
> > > > >> > In case of crash recovery, there is no additional actions need
> to be
> > > > >> > applied to keep the cache partition file consistency. We are not
> > > > >> > recovering partition with the moving state, thus the single
> > > partition
> > > > >> > file will be lost and only it. The uniqueness of it is
> guaranteed by
> > > > >> > the single-file-transmission process. The cache partition file
> will
> > > be
> > > > >> > fully loaded on the next rebalance procedure.
> > > > >> >
> > > > >> > To provide default cluster recovery guarantee we must to:
> > > > >> > 1. Start the checkpoint process when the temporary WAL storage
> > > becomes
> > > > >> > empty;
> > > > >> > 2. Wait for the first checkpoint ends and set owning status to
> the
> > > > >> > cache partition;
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > [1]
> > > > >> >
> > > > >>
> > >
> http://apache-ignite-developers.2346864.n4.nabble.com/Rebalancing-how-to-make-it-faster-td28457.html
> > > > >> > [2]
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-16%3A+Optimization+of+rebalancing
> > > > >> > [3] https://issues.apache.org/jira/browse/IGNITE-8020
> > > > >> > [4]
> > > > >> >
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/IGNITE/IEP-28%3A+Cluster+peer-2-peer+balancing
> > > > >> >
> > > > >>
> > > > >
> > >
>

Reply via email to