There will be two instances, and these instances will have different lifetimes.
> it seems weird that there are two different interfaces for essentially the same purpose They are a bit different, one operates on ICacheEntry, another on ICacheEntryEvent (which carries additional data) On Wed, Apr 25, 2018 at 8:40 AM, Raymond Wilson <raymond_wil...@trimble.com> wrote: > Let’s say I do this, where I define a single class FileFilter that > implements both remote filter interfaces (BTW, it seems weird that there > are two different interfaces for essentially the same purpose): > > > > public class RemoteFileFilter : > > ICacheEntryFilter<BufferQueueKey, BufferQueueItem>, > > ICacheEntryEventFilter<BufferQueueKey, BufferQueueItem>, > > > > > > RemoteFileFilter FileFilter = new RemoteFileFilter(); > > > > // Construct the continuous query machinery > > // Set the initial query to return all elements in the cache > > // Instantiate the queryHandle and start the continous query > on the remote nodes > > // Note: Only cache items held on this local node will be > handled here > > IContinuousQueryHandle<ICacheEntry<BufferQueueKey, > BufferQueueItem>> queryHandle = queueCache.QueryContinuous > > (qry: new ContinuousQuery<BufferQueueKey, > BufferQueueItem>(new LocalFileListener()) > > { > > Filter = FileFilter > > }, > > initialQry: new ScanQuery<BufferQueueKey, BufferQueueItem> > > { > > Filter = FileFilter > > })) > > > > Here, the same filter is supplied to both the continuous query and initial > scan query aspects. When this continuous query is serialized, send to the > remote node, then deserialised, will the continuous query on the remote > node retain two references to the same remote filter, or two instances of > the remote filter? > > > > Thanks, > > Raymond. > > > > *From:* Pavel Tupitsyn [mailto:ptupit...@apache.org] > *Sent:* Wednesday, April 25, 2018 5:19 PM > > *To:* user@ignite.apache.org > *Subject:* Re: Using a cache as an affinity co-located processing buffer > in Ignite.Net > > > > What do you mean by "instance"? In terms of CLR that would be a different > instance on every node. > > > > On Wed, Apr 25, 2018 at 2:50 AM, Raymond Wilson < > raymond_wil...@trimble.com> wrote: > > I’m using a Continuous Query in both options (grid deployed service using > a CQ versus an independent context using a CQ). I was curious which context > using a CQ would be seen as desirable. > > > > In the case where a filter is provided to a CQ for both the initial query > and for newly items arriving in the cache I would need to supply the same > filter instance for both as the processing logic has state that will need > to be shared between the two. Once the CQ has been serialized to the remote > nodes, will that filter be two separate instances or will is retain the > same singular instance? > > > > *From:* Pavel Tupitsyn [mailto:ptupit...@apache.org] > *Sent:* Wednesday, April 25, 2018 6:08 AM > > > *To:* user@ignite.apache.org > *Subject:* Re: Using a cache as an affinity co-located processing buffer > in Ignite.Net > > > > ContinuousQuery is the best practice for most kinds of streaming use > cases. I think it fits your use case as well. > > > > On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson < > raymond_wil...@trimble.com> wrote: > > Thanks, that makes sense. > > > > From a best practices perspective, is better to have a grid deployed > service on each node executing local continuous queries against the cache > and orchestrating the processing from within the service, versus having > some singular context in the grid that uses the continuous query by placing > processing orchestration logic in the filter sent to the remote nodes? > > Sent from my iPhone > > > On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <ptupit...@apache.org> wrote: > > Sorry, looks like I have misunderstood you. > > > > If you need initial scan, of course you can have it by using ScanQuery as > initialQuery. > > Place all the processing logic into the ScanQuery filter, and return false > from there. > > This way you can process all existing entries in a co-located fashion > without sending them to the initiator node. > > > > Thanks, > > Pavel > > > > On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson < > raymond_wil...@trimble.com> wrote: > > Not being able to do an initial scan of elements on the remote nodes is a > bit of a problem (possibly a bug?) > > > > Something that’s occurred to me is to wrap this behaviour into an Ignite > service deployed onto all of the server nodes, and use a local mode > continuous query from within each service to perform an initial scan of > elements and then steady state handling as new elements arrive. > > > > The reason the initial scan is important is I need to handle cases where > there may be a non-trivial queue of items waiting for processing and there > is either a shutdown/restart of the grid, or there is a topology change > event that triggers rebalancing > > > > *From:* Pavel Tupitsyn [mailto:ptupit...@apache.org] > *Sent:* Tuesday, April 24, 2018 5:54 AM > > > *To:* user@ignite.apache.org > *Subject:* Re: Using a cache as an affinity co-located processing buffer > in Ignite.Net > > > > > Is the initial query also run in the context of the remote node and > the remote filter? > > No, it is just a query (can be SQL or Scan) which allows you to get a > "full picture" on the calling node: > > all existing data and all future data. > > > > So in your scenario it is not very useful. > > > > > return false from the filter so the element is not sent to the local > listener > > Yes, exactly > > > > On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson < > raymond_wil...@trimble.com> wrote: > > OK – I see how that works. > > > > In the page https://apacheignite-net.readme.io/docs/continuous-queries , > there is this code: > > > > using (var queryHandle = cache.QueryContinuous(qry, initialQry)) > > { > > // Iterate through existing data stored in cache. > > foreach (var entry in queryHandle.GetInitialQueryCursor()) > > Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value); > > > > // Add a few more keys and watch a few more query notifications. > > for (int i = 5; i < 15; i++) > > cache.Put(i, i.ToString()); > > } > > > > Is the initial query also run in the context of the remote node and the > remote filter? > > > > Construction of the ContinuousQuery also requires provision of > LocalListener to receive the cache update items. Is the approach here to > processing the element in the remote filter context then return false from > the filter so the element is not sent to the local listener? > > > > > > *From:* Pavel Tupitsyn [mailto:ptupit...@apache.org] > *Sent:* Monday, April 23, 2018 7:50 PM > > > *To:* user@ignite.apache.org > *Subject:* Re: Using a cache as an affinity co-located processing buffer > in Ignite.Net > > > > Remote Listener is deployed on every cache node and is invoked only on a > primary node for that key. > > In other words, for each key there is only one invocation of the remote > filter, and that invocation is local to that key. > > > > So you can place your processing logic into the Remote Filter. > > > > On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson < > raymond_wil...@trimble.com> wrote: > > Hi Pavel, > > > > Yes, I looked at continuous queries. They appear to be oriented toward a > single context being sent the newly arrived elements in the cache from all > primary nodes hosting the cache involved in the query. > > > > In the use case I outlined below, I would like to have the items processed > in co-located contexts (ie: the data does not move and is processed in situ > on the primary node). How do you do that with a continuous query? > > > > Thanks, > > Raymond. > > > > *From:* Pavel Tupitsyn [mailto:ptupit...@apache.org] > *Sent:* Monday, April 23, 2018 7:18 PM > *To:* user@ignite.apache.org > *Subject:* Re: Using a cache as an affinity co-located processing buffer > in Ignite.Net > > > > Hi Raymond, > > > > To process incoming data in a co-located fashion there is a Continuous > Query feature [1]. > > Looks like it fits your use case quite well. > > > > > > [1] https://apacheignite-net.readme.io/docs/continuous-queries > > > > On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson < > raymond_wil...@trimble.com> wrote: > > I did find ICache.GetLocalEntries() method and have written the following > as a proof of concept (yet to exercise it though): > > > > IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>> > localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary}); > > > > ICacheEntry<BufferQueueKey, BufferQueueItem> first = > localItems.FirstOrDefault(); > > > > if (first != null) > > { > > // Get the list of all items in the buffer matching the > affinity key of the first item > > // in the list, limiting the result set to 100 TAG files. > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .Take(100) > > .ToList(); > > > > if (candidates?.Count > 0) > > { > > // Submit the list of items to the processor > > // ... > > } > > } > > > > This seems like it should do what I want, but I’m a little suspicious that > it may evaluate the entire content of the cache against the Where() > condition before taking the first 100 results. > > > > I think I can constrain it by modifying the LINQ expression like this: > > > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Take(100) > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .ToList(); > > > > Which will at least limit the overall number examined to be 100, while not > capturing the first 100 that do match. > > > > I could further modify it to a ‘double-take’ which still constrains the > overall query but improves the chances of filling the maximum take of 100 > matching items > > > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Take(1000) > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .Take(100) > > .ToList(); > > > > Or is there a better way? > > > > Thanks, > > Raymond. > > > > *From:* Raymond Wilson [mailto:raymond_wil...@trimble.com] > *Sent:* Monday, April 23, 2018 1:11 PM > *To:* user@ignite.apache.org > *Subject:* Using a cache as an affinity co-located processing buffer in > Ignite.Net > > > > All, > > > > I have been thinking about how to use Ignite.Net to support an affinity > co-located ingest pipeline that uses queue buffering to provide fault > tolerance and buffering for a flow of ingest packages. > > > > At a high level, it looks like this: > > > > Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, > affinity co-located with PackageProcessor] > > Processing pipeline: [PackageCache] -> [PackageProcessor] -> > [ProcessedDataCache affinity co-located with PackageProcessor] > > > > Essentially, I want a cache that look like this: > > > > Public class CacheItem > > { > > Public DateTime date; > > > > [AffinityKeyMapped] > > public Guid AffinityKey; > > > > public byte [] Package; > > } > > > > ICache<string, CacheTime> BufferQueue. > > > > BufferQueue = ignite.GetOrCreateCache <string, CacheItem > ( > > new CacheConfiguration > > { > > Name = “BufferQueue”, > > > > KeepBinaryInStore = true, > > > > // Replicate the maps across nodes > > CacheMode = CacheMode.Partitioned, > > }); > > } > > > > This queue will target a data region that is configured for persistency. > > > > Inbound packages will arrive and be injected into the BufferQueue cache > from some client node context, like this: > > > > public void HandleANewPackage(string key, Guid affinityKey, byte [] > package) > > { > > BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey = > affinityKey, Package = package}); > > } > > > > There will be a collection of server nodes that are responsible for the > cache. > > > > This is all straightforward. The tricky bit is then processing the > elements in the BufferQueue cache. > > > > The data is already on the server nodes, nicely co-located according to > its affinity. I want to have parallel processing logic that runs on the > server nodes that pulls elements from the buffer queue and processes them > into some other cache(s). > > > > At this point I know I have a cache that may contain something needing to > be processed, but I don’t know their keys. I know it’s possible to have > logic running on each server node that does this (either as a Grid Service > or a Compute::Broadcast() lambda): > > > > var cache = ignite.GetCache<string, CacheItem>("BufferQueue"); > > var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter > ())); > > > > foreach (var cacheEntry in cursor) > > ProcessItem(CacheEntry); > > > > …but I am not sure how to restrict the elements in the cache returned to > the query to be only those entries affinity co-located with the server > asking for them. > > > > Is this so obvious that it just works and does not need documentation, or > is this not possible and I should run the processing context from a client > node context (as above) and pay the penalty of extracting the packages from > the cache with cache.Query() and then resubmitting them using an affinity > aware method like AffinityRun()? > > > > Thanks, > > Raymond. > > > > > > > > > > > > > > > > >