Hi Val, Here's the background: I'm thinking about creating message pipelines right in ignite cluster. it's a nightmare to save data in a centralized cache and run everything on it, so my whole idea is to guarantee *none-blocking process* and scalable consumers.
1. (producer) put real-time stream into internal message pipeline 2. (consumer) top N algorithm process 3. (consumer) cross-cache replication <http://apache-ignite-users.70518.x6.nabble.com/file/t1585/topitem.jpg> //Topic based message receiver IgniteMessaging msg = ignite.message(ignite.cluster().forRemotes()); msg.localListen("Item", (nodeId, message) -> { this.topitems.Process((AdobeItem) message); return true; }); //algorithm class public class AdobeTopViewedItems { private static Ignite ignite; private static IgniteCache<String,AdobeItem> cacheItems; private Map<String, AdobeItem> ListTopN = new LinkedHashMap<String, AdobeItem>(); private AdobeItem smallest=null; private Integer TopN=Integer.parseInt(System.getProperty("TopN")); private static WebsocketUtil socket; public AdobeTopViewedItems(String accountid) { ignite=Ignition.ignite(); cacheItems = ignite.cache("Items"); Initialize(accountid); socket = new WebsocketUtil("s7biapp10",9001,"ItemChannel"); } public Map<String, AdobeItem> Get(){ return ListTopN; } public void Initialize(String accountid){ this.ListTopN=load(accountid); } public Map<String, AdobeItem> load(String accountid){ Map<String, AdobeItem> output =new LinkedHashMap<String, AdobeItem>(); String script="from AdobeItem \n" + "where \n" + "datekey>='"+DateUtil.DateTime2DateString(DateUtil.DateTime2Date(new Date()))+"' \n" + "and accountid='"+accountid+"' \n" + "order by visits desc \n" + "limit "+Integer.toString(TopN)+" \n"; SqlQuery<String, AdobeItem> sql=new SqlQuery<String, AdobeItem>(AdobeItem.class, script); QueryCursor<javax.cache.Cache.Entry<String, AdobeItem>> cursor=cacheItems.query(sql); for (javax.cache.Cache.Entry<String, AdobeItem> row : cursor){ AdobeItem entry=row.getValue(); output.put(entry.key(), entry); } return output; } public void Process(AdobeItem item){ if(this.smallest==null){ this.smallest=item; this.ListTopN.put(item.key(), item); this.ListTopN=sort(ListTopN,false); }else{ if(item.visits>this.smallest.visits){ this.smallest=item; this.ListTopN.put(item.key(), item); this.ListTopN=sort(ListTopN,false); *send(item);* } } } private void send(AdobeItem item){ String message=new Gson().toJson(item); socket.send(message, MessageRoute.server2client.toString()); } private Map<String, AdobeItem> sort(Map<String, AdobeItem> map,Boolean descending) { final Boolean is_descending=descending; List<Entry<String, AdobeItem>> list = new LinkedList<Entry<String, AdobeItem>>(map.entrySet()); Collections.sort(list, new Comparator<Entry<String, AdobeItem>>() { public int compare(Entry<String, AdobeItem> o1, Entry<String, AdobeItem> o2) { if (is_descending) { return o2.getValue().visits.compareTo(o1.getValue().visits); } else { return o2.getValue().visits.compareTo(o1.getValue().visits); } } }); Map<String, AdobeItem> sortedMap = new LinkedHashMap<String, AdobeItem>(); Integer counter=0; for (Entry<String, AdobeItem> entry : list) { sortedMap.put(entry.getKey(), entry.getValue()); counter+=1; smallest=entry.getValue(); if(counter==TopN) break; } list.clear(); list=null; return sortedMap; } } vkulichenko wrote > Sorry, I'm still failing to understand what you're trying to achieve. What > is > the reason to manually maintain a tree structure which is basically an > index? Why not use Ignite indexes that are provided out of the box? > > -Val > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/