Hi Val,
Here's the background:
I'm thinking about creating message pipelines right in ignite cluster. 
it's a nightmare to save data in a centralized cache and run everything on
it, 
so my whole idea is to guarantee *none-blocking process* and scalable
consumers.

1. (producer) put real-time stream into internal message pipeline
2. (consumer) top N algorithm process
3. (consumer) cross-cache replication


<http://apache-ignite-users.70518.x6.nabble.com/file/t1585/topitem.jpg> 

//Topic based message receiver
        IgniteMessaging msg = ignite.message(ignite.cluster().forRemotes());
        msg.localListen("Item", (nodeId, message) -> {
                this.topitems.Process((AdobeItem) message);
            return true; 
        });


//algorithm class
public class AdobeTopViewedItems {
        private static Ignite ignite; 
        private static IgniteCache<String,AdobeItem> cacheItems;
        
        private Map<String, AdobeItem> ListTopN = new LinkedHashMap<String,
AdobeItem>();
        private AdobeItem smallest=null;
        private Integer TopN=Integer.parseInt(System.getProperty("TopN"));
        private static WebsocketUtil socket;
        public AdobeTopViewedItems(String accountid) {
                ignite=Ignition.ignite();
                cacheItems = ignite.cache("Items");
                Initialize(accountid);
                socket = new WebsocketUtil("s7biapp10",9001,"ItemChannel");
        }
        
        public Map<String, AdobeItem> Get(){
                return ListTopN;
        }
        
        public void Initialize(String accountid){
                this.ListTopN=load(accountid);
        }
        
        
        public Map<String, AdobeItem> load(String accountid){
                Map<String, AdobeItem> output =new LinkedHashMap<String, 
AdobeItem>();
                String script="from AdobeItem \n"
                                + "where \n"
                                + 
"datekey>='"+DateUtil.DateTime2DateString(DateUtil.DateTime2Date(new
Date()))+"' \n"
                                + "and accountid='"+accountid+"' \n"
                                + "order by visits desc \n"
                                + "limit "+Integer.toString(TopN)+" \n";
                SqlQuery<String, AdobeItem> sql=new SqlQuery<String,
AdobeItem>(AdobeItem.class, script);
                QueryCursor<javax.cache.Cache.Entry&lt;String, AdobeItem>>
cursor=cacheItems.query(sql);
                for (javax.cache.Cache.Entry<String, AdobeItem> row : cursor){
                        AdobeItem entry=row.getValue();
                        output.put(entry.key(), entry);
                }
                return output;
        }
        
        public void Process(AdobeItem item){
        if(this.smallest==null){
                this.smallest=item;
                this.ListTopN.put(item.key(), item);
                this.ListTopN=sort(ListTopN,false);
                
        }else{
                if(item.visits>this.smallest.visits){
                        this.smallest=item;
                        this.ListTopN.put(item.key(), item);
                        this.ListTopN=sort(ListTopN,false);
                        *send(item);*
                }
        }
     }
        
         private void send(AdobeItem item){
                 String message=new Gson().toJson(item);
                 socket.send(message, MessageRoute.server2client.toString());
         }
   
     private Map<String, AdobeItem> sort(Map<String, AdobeItem> map,Boolean
descending) {
        final Boolean is_descending=descending;
        List<Entry&lt;String, AdobeItem>> list = new
LinkedList<Entry&lt;String, AdobeItem>>(map.entrySet());
        Collections.sort(list, new Comparator<Entry&lt;String, AdobeItem>>() {
           public int compare(Entry<String, AdobeItem> o1, Entry<String,
AdobeItem> o2) {
               if (is_descending) {
                return o2.getValue().visits.compareTo(o1.getValue().visits);
               } else {
                   return
o2.getValue().visits.compareTo(o1.getValue().visits);
               }
           }
        });
        Map<String, AdobeItem> sortedMap = new LinkedHashMap<String,
AdobeItem>();
        Integer counter=0;
        for (Entry<String, AdobeItem> entry : list) {
                        sortedMap.put(entry.getKey(), entry.getValue());
                        counter+=1;
                        smallest=entry.getValue();
                        if(counter==TopN) break;
        }
        list.clear();
        list=null;
        return sortedMap;
     }
}




vkulichenko wrote
> Sorry, I'm still failing to understand what you're trying to achieve. What
> is
> the reason to manually maintain a tree structure which is basically an
> index? Why not use Ignite indexes that are provided out of the box?
> 
> -Val
> 
> 
> 
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to