Hi, I have a use case where I want a fully copy of a replicated cache on a subset of my thick clients. In this example, I have an ETL thick client that creates and updates a replicated cache in my server grid. I then have a series of webserver thick clients that I always want a fully up to date copy of that cache.
NearCache Attempt: ============= I tried using a NearCache on the webserver thick clients but this had two undesireable problems: * it created a near cache on each server node which I have no use for since this is already an replicated cache * the near cache never received new entries from the replicated cache, it was only updated with entries it had already had stored in it. Is there a way I can resolve either of these two undesireable problems of the NearCache for my situation? Continuous Query Attempt: ================= This lead me to instead consider Continuous Queries (CQ) where I would have each webserver maintain it's own Map of the server cache data where upon startup it uses the CQ initial query to get the current server state on startup and then uses the CQ local listener. Trying to get the CQ working I followed the examples in text in https://ignite.apache.org/docs/latest/configuring-caches/near-cache however I can only see the local listener updates if I run the query in my own thread that I never let finish. What am I doing wrong in the code below? Continuous Query code: --------------------------- // Create new continuous query. val qry = ContinuousQuery<Int, String>() // have it return all data in its initial query // Setting an optional initial query. qry.setInitialQuery(ScanQuery()) // don't set a remote filter as we want all data returned // qry.setRemoteFilterFactory() // Callback that is called locally when update notifications are received. qry.setLocalListener { events -> println("Update notifications starting...[thread=${Thread.currentThread()}]") for (e in events) { println("Listener event: [thread=${Thread.currentThread()}, key=${e.key}, val=${e.value}]") } println("Update notifications finished [thread=${Thread.currentThread()}]") } val executor = Executors.newSingleThreadExecutor() executor.submit { myCache.query(qry).use { cur -> // Iterating over initial query results println("Initial query cursor starting...[thread=${Thread.currentThread()}]") for (e in cur) { println("Cursor value: [thread=${Thread.currentThread()}, key=${e.key}, val=${e.value}]") } println("Initial query cursor finished [thread=${Thread.currentThread()}]") println("Starting holding continuous query cursor open so we can get callback data... [thread=${Thread.currentThread()}]") val shouldBeRunning = true while (shouldBeRunning) { // hold this thread open forever so the local listener callback can keep processing events try { Thread.sleep(5000) } catch (e: InterruptedException) { println("Continuous query sleep interrupted [thread=${Thread.currentThread()}]") throw e } } println("Stopping holding continuous query cursor open because we got shutdown signal [thread=${Thread.currentThread()}]") } } -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
