Hi,

I'm wrestling with Continuous Queries. I'm successfully writing data into
Ignite via JDBC; now I want to do a Continuous Query from a client app as
I'm writing that data. I got past several issues by setting
'peerClassLoadingEnabled', using binary objects, and implementing my local
listener and remote filter as static nested classes rather than lambdas.
Now I have an app that executes with no errors, and loads some initial
data, but it doesn't get any notifications via a Continuous Query.

Here's my app:

public class Main {
  public static class LocalListener<K, V> implements
CacheEntryUpdatedListener<K, V> {
    @Override
    public void onUpdated(Iterable evts) throws CacheEntryListenerException
{
      evts.forEach(e -> System.out.println("e=" + e));
    }
  }

  public static class RemoteFilter<Long, BinaryObject> implements
CacheEntryEventSerializableFilter {
    @Override
    public boolean evaluate(CacheEntryEvent evt) throws
CacheEntryListenerException {
      System.out.println("###");
      return true;
    }
  }

  public static void main(String[] args) throws Exception {
    Ignition.setClientMode(true);

    System.out.println("Starting Ignite");

    // Connecting to the cluster.
    Ignite ignite =
Ignition.start("/Users/pat/Downloads/apache-ignite-fabric-2.5.0-bin/config/default-config.xml");

    System.out.println("Started Ignite");

    // Getting a reference to an underlying cache created for City table
above.
    IgniteCache<Long, BinaryObject> cache =
ignite.cache("SQL_PUBLIC_CITY").withKeepBinary();

    BinaryObject city = cache.get(1L);

    System.out.println(city);

    QueryCursor<List<?>> query = cache.query(new SqlFieldsQuery("SELECT
name FROM City"));
    System.out.println(query.getAll());

    ContinuousQuery<Long, BinaryObject> qry = new ContinuousQuery<>();

    qry.setLocalListener(new LocalListener<>());

    qry.setRemoteFilter(new RemoteFilter<>());

    try (QueryCursor<Cache.Entry<Long, BinaryObject>> cur =
cache.query(qry)) {
      // Iterating over existing data stored in cache.
      for (Cache.Entry<Long, BinaryObject> e : cur)
        System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
    }
  }
}

And here's default-config.xml, shared by both my server and client

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd";>
    <!--
        Alter configuration below as needed.
    -->
    <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
      <property name="peerClassLoadingEnabled" value="true"/>
    </bean>
</beans>

I'm doing this to test from sqlline:

CREATE TABLE City (id LONG PRIMARY KEY, name VARCHAR) WITH
"template=replicated";
INSERT INTO City (id, name) VALUES (1, 'Forest Hill');
INSERT INTO City (id, name) VALUES (2, 'Denver');

And my app's output is:

[usual startup stuff]
Started Ignite
SQL_PUBLIC_CITY_13ff453a_0162_4c9a_a224_699fbf252790 [idHash=1642017078,
hash=1261261831, NAME=Forest Hill]
[[Forest Hill], [Denver]]

I add another city in sqlline, but I get no output in my app.

Any ideas?

Cheers,

Pat

--

Pat Patterson | Technical Director | http://about.me/patpatterson

Reply via email to