Hi.. Sorry its quite late to reply. CQ is setup in execute method of service
not in init(), but we do have initialQuery in CQ to scan existing events to
matching the filter. Below is snapshot of one of the many ignite services
set to process trade on when trade moves to particular status.

As you can see, I have added logs to remote filter predicate. But these logs
don't get printed when trade get stuck at particular status. So I assume,
remote filter does not pick the events it is supposed to track.

public enum TradeStatus { 
        NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS 
}


/**
 * Ignite Service which picks up CHANGED trade delivery items
 */
public class ChangedTradeService implements Service{

        @IgniteInstanceResource
        private transient Ignite ignite;
        private transient IgniteCache<Long, Trade> tradeCache;
        private transient QueryCursor<Entry&lt;Long, Trade>> cursor;

        @Override
        public void init(ServiceContext serviceContext) throws Exception {
                tradeCache = ignite.cache("tradeCache");
        }

        @Override
        public void execute(ServiceContext serviceContext) throws Exception {
                ContinuousQuery<Long, Trade> query = new ContinuousQuery<>();
                query.setLocalListener((CacheEntryUpdatedListenerAsync<Long, 
Trade>)
events -> events
                                .forEach(event -> process(event.getValue())));
                query.setRemoteFilterFactory(factoryOf(checkStatus(status)));
                query.setInitialQuery(new 
ScanQuery<>(checkStatusPredicate(status)));
                QueryCursor<Cache.Entry&lt;Long, Trade>> cursor = 
tradeCache.query(query);
                cursor.forEach(entry -> process(entry.getValue()));
        }

        private void process(Trade item){
             log.info("transition started for trade id :"+item.getPkey());
                //move the trade to next state(e.g SUCCESS) and next 
Service(contains CQ,
which is looking for SUCCESS status) will pick this up for processing
further and so on
             log.info("transition finished for trade id :"+item.getPkey());     
}

        @Override
        public void cancel(ServiceContext serviceContext) {
                cursor.close();
        }
        
        static CacheEntryEventFilterAsync<Long, Trade> checkStatus(TradeStatus
status) {
                return event -> event.getValue() != null &&
checkStatusPredicate(status).apply(event.getKey(), event.getValue());
        }
        
        static IgniteBiPredicate<Long, TradeStatus>
checkStatusPredicate(TradeStatus status) {
                return (k, v) -> {
                        LOG.debug("Status checking for: {} Event value: {} 
isStatus: {}", status,
v, v.getStatus() == status);
                        return v.getStatus() == status;
                };
        }
}




--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Continuous-Query-remote-listener-misses-some-events-or-respond-really-late-tp12338p13476.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to