On Mon, Dec 14, 2015 at 3:16 PM, Andrey Kornev <[email protected]> wrote:
> Hey Denis, > > Thanks for your reply! The race you've described is definitely possible. > > However, it seems SqlQuery would also be vulnerable to the same race > condition. I broadcast the tasks to all nodes and while each task is trying > to start a local SqlQuery on the node a new node joins the cluster. > Wouldn't I run into the same issue? > Andrey, SqlQuery or ScanQuery should guarantee that you are running on a locked partitions which are in a complete state. If a partition has not been filled yet, e.g. due to a node just joining, then another partition, which has full state on another node, will be picked for a query. > > I'd like emphasize I'm not talking about a single node launching a query > against the cluster. I'd like to process my data on each node in parallel, > but only the data that matches a certain SQL query. > > Thanks > Andrey > > ------------------------------ > Subject: Re: Computation on NodeEntries > To: [email protected] > From: [email protected] > Date: Mon, 14 Dec 2015 10:53:04 +0300 > > > Hi Andrey, > > Please see below. > > On 12/13/2015 9:08 AM, Andrey Kornev wrote: > > Yakov, > > If partions do not migrate while they are being iterated over, wouldn't it > then suffice to simply execute a single ScanQuery with its isLocal set to > true? My reasoning here is that the scan would create an iterator for all > affinity partitions, thus preventing their migration. If it's not the case, > how would then a local ScanQuery behave in presence of topology changes? > > From what I see in the code setting ScanQuery's isLocal to 'true' gives an > ability to iterate over all the partitions that belonged to a node at the > time the query is started. All the partitions won't be moved to another > node until the querys's iterator is closed. > > However, here I see the following issue. Imagine that your cluster has two > nodes and you decided to iterate over all local partitions of two nodes and > the execution sequence looks like this: > > 1) ScanQuery with isLocal=true started executing on *node A*. All the > partitions are blocked and won't be moved. > 2) *Node B* receives the same compute job with the ScanQuery in it. > However because of an OS scheduler a Thread that is in charge of starting > the query is blocked for some time. So the iterator over local partitions > is not ready yet and the partitions are not blocked; > 3) Third *node C* joins the topology. Partitions that are owned by *Node > B *may be rebalanced among *node A* and *node C*. > 4) Partitions that are rebalanced from node B to *node A* won't be > visited by your code because node's A iterator is already built while > node's B iterator is constructed after the rebalancing. > > The issue can't happen when you specify partitions explicitly using > Yakov's approach below. Because in the worst case in the situation like > above a just rebalanced partition's data will be uploaded to a node that > was initially an owner of the partition (at the time when you calculated > partitions owners). > > > > Also, what's the best way to handle topology changes while using the > SqlQuery rather than ScanQuery? Basically, it's the same use case, only > instead of scanning the entire partition I'd like to first filter the cache > entries using a query. > > SqlQueries will work transparently for you and guarantee to return a full > and consistent result set even if a topology is changed while a query is in > progress. > > -- > Denis > > Thanks > Andrey > _____________________________ > From: Yakov Zhdanov <[email protected]> > Sent: Friday, December 11, 2015 10:55 AM > Subject: RE: Computation on NodeEntries > To: <[email protected]> > > > Partition will not migrate if local or remote iterator is not > finished/closed. > On Dec 11, 2015 21:05, "Andrey Kornev" < [email protected]> wrote: > > Great suggestion! Thank you, Yakov! > > Just one more question. :) Let's say the scan job is running node A and > processing partition 42. At the same time, a new node B joins and partition > 42 needs to be moved to this node. What will happen to my scan query that > is still running on node A and iterating over the partition's entries? > Would it complete processing the entire partition despite the change of > ownership? Or, would the query terminate at some arbitrary point once the > partition ownership transfer has completed? > > Thanks a lot! > Andrey > > ------------------------------ > Date: Fri, 11 Dec 2015 16:06:16 +0300 > Subject: Re: Computation on NodeEntries > From: [email protected] > To: [email protected] > > Guys, I would do the following: > > 1. Map all my partitions to > nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes > 2. Send jobs (with its list of partitions) to each node using map returned > on step1 > 3. Job may be like: > > new Runnable() { > @Override public void run() { > for (Integer part : parts) { > Iterator<Cache.Entry<Object, Object>> it = cache.query(new > ScanQuery<>(part)).iterator(); > > // do the stuff... } > > } > }; > > This may result in network calls for some worst cases when topology changes > under your feet, but even in this case this should work. > > > --Yakov > > 2015-12-11 2:13 GMT+03:00 Andrey Kornev <[email protected]>: > > Dmitriy, > > Given the approach you suggested below, what would be your recommendation > for dealing with cluster topology changes while the iteration is in > progress? An obvious one I can think of is to > - somehow detect the change, > - cancel the tasks on all the nodes > - wait until the rebalancing is finished and > - restart the computation. > > Are there any other ways? Ideally, I'd like to have the "exactly-once" > execution semantics. > > Thanks > Andrey > > > > >
