Please see my comments inline. I've tried my best to be as brief as possible, 
but not sure I've succeeded. My sincere apologies.

But first I'd like to step back and clarify CQ use cases, as I see them.

Use case 1: stateless event filter.
Use case 2: stateful view of the data in cache.

For the first use case, the initial state of the cache is immaterial, so no 
initial point-in-time snapshot is required.

The second use case is pretty common in finance. For example, a bank would like 
to track the value of a portfolio in real-time. One way to do it would be:
- first, build the current state of the portfolio by running an initial 
point-in-time query (while holding back any qualifying events that may have 
occurred in the meantime);
- next, start processing events as they arrive. The events that occurred while 
the initial snapshot was being built get delivered now, followed by real-time 
events.

While the existing CQ API is more than sufficient for the first use case, but 
is rather incomplete with respect to the second (in my opinion, that is).  Of 
course, if the use case is considered a non-goal for the project, then please 
feel free to pretty much ignore the rest of this post. Otherwise, scroll down 
to where it gets real! :)

Thanks
Andrey

> From: Dmitriy Setrakyan <[email protected]>
> Subject: Re: Continuous Query
> Date:     Wed, 29 Apr 2015 05:21:05 GMT
>
> On Tue, Apr 28, 2015 at 3:52 PM, Kornev, Andrey <[email protected]>
> wrote:
> 
> > Hello,
> >
> > There are a couple of things wrt Ignite's CQ API and implementation I'd
> > like to bring the community's attention to.
> >
> > First, a CQ instance is a long living resource. Once started it continues
> > to run until explicitly stopped by closing its cursor. If the query master
> > node (the one holding the instance of the QueryCursor) crashes and the Auto
> > Unsubscribe is off, then it doesn't seem there is any way to stop the CQ
> > save for a complete restart of the grid. Making it possible to obtain the
> > instance of the CQ from any grid node, might improve things.
> >
>
> Agree, this sounds like API limitation. I will file a ticket.
> 
> 
> >
> > Second, the purpose of the initial query and its usage in the current API
> > is not clear. It makes one wonder what was the original use case the API
> > was designed to address?
> >
> > A couple of things:
> >
> > 1) the implementation doesn't provide a consistent point-in-time snapshot
> > of the cache (no isolation). The cursor may deliver a more recent version
> > of an entry if it got updated by a concurrent transaction. The same entry
> > will also be delivered to the listener as an update event. Please correct
> > me if I'm wrong.
> >
> 
> Well, it depends which query you use. If you use SqlQuery or SqlFieldsQuery
> as initial query for CQ, then you do get point-in-time isolation (Sergi,
> please correct me if I am wrong here). For ScanQuery you do not get any
> isolation, as it is a plain iteration through cache with a predicate.
> 
It's not immediately obvious from the API or the javadocs that a choice of the 
query interface would have such important consequences for the CQ execution. In 
fact, if that is indeed the case that the scan doesn't produce a consistent 
snapshot, then it should not be allowed to be used with the CQ to prevent users 
from creating hard-to-catch bugs. One way to achieve this would be to have 
ContinuousQuery.setInitialQuery() method defined for the SQL-based query types 
only. But...

However now we have a (usability and potentially correctness) issue: namely the 
disparity between a SQL-based initial query and a programmatic (non SQL-based) 
real-time filter. Somehow one must ensure that both are equivalent: in other 
words, the results of the query and the filter applied to the same data set 
should be identical. It means that I have to express the same condition twice: 
in SQL and in Java. It is especially tricky when the CQ gets started in 
response to some user action (GUI, for example) and the action defines the 
query dynamically: "I want to start tracking my USD portfolio". In such case, 
one would have to somehow generate 2 consistent representations of the same 
query: a SQL string for the initial and an instance of 
CacheEntryEventSerializableFilter for the real-time.

Possible solutions: 
- make ScanQuery consistent (read isolation).
- make it possible to create a filter that encapsulates a SQL statement and use 
it as the real-time filter.

I'm guessing none of these are simple. I'd vote for the first one, since 
real-time evaluation of relational queries is a tricky business especially if 
joins are involved.

> 
> > 2) the delivery of the initial query results is in no way synchronized
> > with the delivery of the events to the listener.
> >
>
> Yes, you are right.
>
> > This makes the API prone to race conditions and its correct usage
> > impossible. By "correct usage" I mean the ability to capture point in time
> > state of the cache followed by the correctly ordered change data events
> > including the ones that occurred while the initial snapshot was being
> > processed. In database systems it is also known as "materialized view
> > maintenance".
> >
> > It'd be more practical to deliver the initial state to the listener
> > instance rather than to the cursor executing in a different thread. It'd
> > also be necessary to punctuate the end of the initial state delivery and
> > the beginning of the change data events, so that the listener could switch
> > from building its initial state to applying incremental updates.
> >
> > I'm curious if any of the above makes any sense?
>
>
> This makes sense to me. I think our CQ APIs should provide a way to return
> initial results as listener notifications as well, instead of returning
> them in a collection. How would you punctuate the end of initial result set
> and beginning of the event notifications?
>
The punctuation can be done the way Yakov has suggested by adding an attribute 
in the instance of the CacheEntry that gets passed into the query listener. 
Another option is to define a specific ContinuousQueryListener interface (that 
may extend JCache's CacheEntryUpdatedListener used now) that would three 
additional methods, something like this:

interface ContinuousQueryListener<K,V> extends CacheEntryUpdatedListener<K,V> {
    
    /** Notifies that the CQ is about to start delivering the results of the 
initial query.*/
    void onInitialStart();
    
    /** 
     * Delivers the next batch of the initial entries. Notice, these are *NOT* 
events, but 
     * cache entries (facts). 
     */
    void onInitialNext(Iterable<CacheEntry<K,V>> entry);
    
    /** 
     * Indicates that all initial entries have been delivered and the real-time 
events will 
     * from this moment on be delivered to 
CacheEntryUpdatedListener.onUpdated().
     */
    void onInitialComplete();
}

As with the regular cache listeners, if an implementation of the CQ listener 
implements Closeable, then as per the JCache spec Closeable.close() should be 
called when the CQ instance is closed.

One last thing. For materialized view maintenance it'

> > Thanks
> > Andrey                                        

Reply via email to