I’m using a Continuous Query in both options (grid deployed service using a
CQ versus an independent context using a CQ). I was curious which context
using a CQ would be seen as desirable.



In the case where a filter is provided to a CQ for both the initial query
and for newly items arriving in the cache I would need to supply the same
filter instance for both as the processing logic has state that will need
to be shared between the two. Once the CQ has been serialized to the remote
nodes, will that filter be two separate instances or will is retain the
same singular instance?



*From:* Pavel Tupitsyn [mailto:ptupit...@apache.org]
*Sent:* Wednesday, April 25, 2018 6:08 AM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



ContinuousQuery is the best practice for most kinds of streaming use cases.
I think it fits your use case as well.



On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <raymond_wil...@trimble.com>
wrote:

Thanks, that makes sense.



>From a best practices perspective, is better to have a grid deployed
service on each node executing local continuous queries against the cache
and orchestrating the processing from within the service, versus having
some singular context in the grid that uses the continuous query by placing
processing orchestration logic in the filter sent to the remote nodes?

Sent from my iPhone


On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <ptupit...@apache.org> wrote:

Sorry, looks like I have misunderstood you.



If you need initial scan, of course you can have it by using ScanQuery as
initialQuery.

Place all the processing logic into the ScanQuery filter, and return false
from there.

This way you can process all existing entries in a co-located fashion
without sending them to the initiator node.



Thanks,

Pavel



On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <raymond_wil...@trimble.com>
wrote:

Not being able to do an initial scan of elements on the remote nodes is a
bit of a problem (possibly a bug?)



Something that’s occurred to me is to wrap this behaviour into an Ignite
service deployed onto all of the server nodes, and use a local mode
continuous query from within each service to perform an initial scan of
elements and then steady state handling as new elements arrive.



The reason the initial scan is important is I need to handle cases where
there may be a non-trivial queue of items waiting for processing and there
is either a shutdown/restart of the grid, or there is a topology change
event that triggers rebalancing



*From:* Pavel Tupitsyn [mailto:ptupit...@apache.org]
*Sent:* Tuesday, April 24, 2018 5:54 AM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



>  Is the initial query also run in the context of the remote node and the
remote filter?

No, it is just a query (can be SQL or Scan) which allows you to get a "full
picture" on the calling node:

all existing data and all future data.



So in your scenario it is not very useful.



>   return false from the filter so the element is not sent to the local
listener

Yes, exactly



On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <raymond_wil...@trimble.com>
wrote:

OK – I see how that works.



In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
there is this code:



using (var queryHandle = cache.QueryContinuous(qry, initialQry))

{

    // Iterate through existing data stored in cache.

    foreach (var entry in queryHandle.GetInitialQueryCursor())

        Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);



    // Add a few more keys and watch a few more query notifications.

    for (int i = 5; i < 15; i++)

        cache.Put(i, i.ToString());

}



Is the initial query also run in the context of the remote node and the
remote filter?



Construction of the ContinuousQuery also requires provision of
LocalListener to receive the cache update items. Is the approach here to
processing the element in the remote filter context then return false from
the filter so the element is not sent to the local listener?





*From:* Pavel Tupitsyn [mailto:ptupit...@apache.org]
*Sent:* Monday, April 23, 2018 7:50 PM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.

In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.



So you can place your processing logic into the Remote Filter.



On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <raymond_wil...@trimble.com>
wrote:

Hi Pavel,



Yes, I looked at continuous queries. They appear to be oriented toward a
single context being sent the newly arrived elements in the cache from all
primary nodes hosting the cache involved in the query.



In the use case I outlined below, I would like to have the items processed
in co-located contexts (ie: the data does not move and is processed in situ
on the primary node). How do you do that with a continuous query?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupit...@apache.org]
*Sent:* Monday, April 23, 2018 7:18 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



To process incoming data in a co-located fashion there is a Continuous
Query feature [1].

Looks like it fits your use case quite well.





[1] https://apacheignite-net.readme.io/docs/continuous-queries



On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <raymond_wil...@trimble.com>
wrote:

I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wil...@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

Reply via email to