Fixed it! I needed to put my thread to sleep while the query was open, like this:
try (QueryCursor<Cache.Entry<Long, BinaryObject>> cur = cityCache.query(qry)) { // Iterating over existing data stored in cache. for (Cache.Entry<Long, BinaryObject> e : cur) System.out.println("key=" + e.getKey() + ", val=" + e.getValue()); boolean done = false; while (!done) { try { Thread.sleep(1000); } catch (InterruptedException e) { done = true; } } } It would be helpful if this, and the need to define the remote filter as a nested static class, was mentioned at https://apacheignite.readme.io/docs/continuous-queries#section-local-listener Cheers, Pat -- Pat Patterson | Technical Director | http://about.me/patpatterson On Wed, Jun 20, 2018 at 8:56 PM Pat Patterson <p...@streamsets.com> wrote: > Hi, > > I'm wrestling with Continuous Queries. I'm successfully writing data into > Ignite via JDBC; now I want to do a Continuous Query from a client app as > I'm writing that data. I got past several issues by setting > 'peerClassLoadingEnabled', using binary objects, and implementing my local > listener and remote filter as static nested classes rather than lambdas. > Now I have an app that executes with no errors, and loads some initial > data, but it doesn't get any notifications via a Continuous Query. > > Here's my app: > > public class Main { > public static class LocalListener<K, V> implements > CacheEntryUpdatedListener<K, V> { > @Override > public void onUpdated(Iterable evts) throws > CacheEntryListenerException { > evts.forEach(e -> System.out.println("e=" + e)); > } > } > > public static class RemoteFilter<Long, BinaryObject> implements > CacheEntryEventSerializableFilter { > @Override > public boolean evaluate(CacheEntryEvent evt) throws > CacheEntryListenerException { > System.out.println("###"); > return true; > } > } > > public static void main(String[] args) throws Exception { > Ignition.setClientMode(true); > > System.out.println("Starting Ignite"); > > // Connecting to the cluster. > Ignite ignite = > Ignition.start("/Users/pat/Downloads/apache-ignite-fabric-2.5.0-bin/config/default-config.xml"); > > System.out.println("Started Ignite"); > > // Getting a reference to an underlying cache created for City table > above. > IgniteCache<Long, BinaryObject> cache = > ignite.cache("SQL_PUBLIC_CITY").withKeepBinary(); > > BinaryObject city = cache.get(1L); > > System.out.println(city); > > QueryCursor<List<?>> query = cache.query(new SqlFieldsQuery("SELECT > name FROM City")); > System.out.println(query.getAll()); > > ContinuousQuery<Long, BinaryObject> qry = new ContinuousQuery<>(); > > qry.setLocalListener(new LocalListener<>()); > > qry.setRemoteFilter(new RemoteFilter<>()); > > try (QueryCursor<Cache.Entry<Long, BinaryObject>> cur = > cache.query(qry)) { > // Iterating over existing data stored in cache. > for (Cache.Entry<Long, BinaryObject> e : cur) > System.out.println("key=" + e.getKey() + ", val=" + e.getValue()); > } > } > } > > And here's default-config.xml, shared by both my server and client > > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=" > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd"> > <!-- > Alter configuration below as needed. > --> > <bean id="grid.cfg" > class="org.apache.ignite.configuration.IgniteConfiguration"> > <property name="peerClassLoadingEnabled" value="true"/> > </bean> > </beans> > > I'm doing this to test from sqlline: > > CREATE TABLE City (id LONG PRIMARY KEY, name VARCHAR) WITH > "template=replicated"; > INSERT INTO City (id, name) VALUES (1, 'Forest Hill'); > INSERT INTO City (id, name) VALUES (2, 'Denver'); > > And my app's output is: > > [usual startup stuff] > Started Ignite > SQL_PUBLIC_CITY_13ff453a_0162_4c9a_a224_699fbf252790 [idHash=1642017078, > hash=1261261831, NAME=Forest Hill] > [[Forest Hill], [Denver]] > > I add another city in sqlline, but I get no output in my app. > > Any ideas? > > Cheers, > > Pat > > -- > > Pat Patterson | Technical Director | http://about.me/patpatterson >