Hello! We finally found a bug in cache.c. New version is in attachement. Everybody who has problems with splitter's crashes are welcome to test. Please, give feedback!
#include "udm_config.h" #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #ifdef HAVE_UNISTD_H #include <unistd.h> #endif #include <sys/types.h> #include <fcntl.h> #include <sys/stat.h> #include <dirent.h> #include "udm_common.h" #include "udm_utils.h" #include "udm_charset.h" #include "udm_spell.h" #include "udm_cache.h" #include "udm_crc32.h" #include "udm_indexer.h" #include "udm_db.h" #include "udm_boolean.h" #include "udm_searchtool.h" #include "udm_agent.h" #include "udm_xmalloc.h" #include "udm_stopwords.h" #include "udm_proto.h" #ifdef HAVE_WINSOCK_H #include <winsock.h> #endif #ifdef HAVE_SYS_SOCKET_H #include <sys/socket.h> #endif #ifdef HAVE_NETINET_IN_H #include <netinet/in.h> #endif #ifdef HAVE_ARPA_INET_H #include <arpa/inet.h> #endif #ifdef HAVE_NETDB_H #include <netdb.h> #endif #ifndef INADDR_NONE #define INADDR_NONE ((unsigned long) -1) #endif #define DEBUG_SEARCH 1 static int open_host(char *hostname,int port, int timeout) { int net; struct hostent *host; struct sockaddr_in sa_in; bzero((char*)&sa_in,sizeof(sa_in)); if (port){ sa_in.sin_port= htons((u_short)port); }else{ return(UDM_NET_ERROR); } if ((sa_in.sin_addr.s_addr=inet_addr(hostname)) != INADDR_NONE){ sa_in.sin_family=AF_INET; }else{ host=gethostbyname(hostname); if (host){ sa_in.sin_family=host->h_addrtype; memcpy(&sa_in.sin_addr, host->h_addr, (size_t)host->h_length); }else{ return(UDM_NET_CANT_RESOLVE); } } net=socket(AF_INET, SOCK_STREAM, 0); if(connect(net, (struct sockaddr *)&sa_in, sizeof (sa_in))) return(UDM_NET_CANT_CONNECT); return(net); } /***********************************/ #define LOGDIR "raw" #define TREEDIR "tree" #define SPLDIR "splitter" /**********************************/ typedef struct udm_cache_table { int wrd_id; int weight; int pos; int len; } UDM_CACHETABLE; typedef struct udm_cache_hheader { int ntables; int version; } UDM_CACHEHEADER; typedef struct udm_cacheword_struct { int url_id; #ifdef UDM_STORE_CACHE_WRDPOS int wrd_pos; #endif #ifdef UDM_STORE_CACHE_SITEID int site_id; #endif #ifdef UDM_STORE_CACHE_CATEGORY int category; #endif #ifdef UDM_STORE_CACHE_TAG int tag; #endif } UDM_CACHEWORD; /******** Convert category string into 32 bit number *************/ static void UdmDecodeCatStr(const char * cat_str, int * cat, int * mask){ unsigned int t[5]; char str[128]=""; strcpy(str,cat_str); strcat(str,"000000000000"); str[10]=0; sscanf(str,"%02x%02x%02x%02x%02x",t+0,t+1,t+2,t+3,t+4); *mask= (int)( (t[0]?0x7F<<25:0) | (t[1]?0x7F<<18:0) | (t[2]?0x3F<<12:0) | (t[3]?0x3F<<6:0) | (t[4]?0x3F<<0:0)); *cat= (int)((t[0]<<25) | (t[1]<<18) | (t[2]<<12) | (t[3]<<6) | (t[4]<<0)); } /*************************** Sort functions **************************/ /* Sort SEARCHWORD by url_id order */ static int cmpurlid(const void *s1,const void *s2){ return(((const UDM_SEARCHWORD*)s1)->url_id-((const UDM_SEARCHWORD*)s2)->url_id); } /* Function to sort LOGWORD list in (WRD_ID,TIME_STAMP) order */ static int cmplog(const void *s1,const void *s2){ unsigned int n1,n2; n1=((const UDM_LOGWORD*)s1)->wrd_id; n2=((const UDM_LOGWORD*)s2)->wrd_id; if(n1==n2){ n1=((const UDM_LOGWORD*)s1)->url_id; n2=((const UDM_LOGWORD*)s2)->url_id; if(n1==n2){ n1=((const UDM_LOGWORD*)s2)->stamp; n2=((const UDM_LOGWORD*)s1)->stamp; } } if(n1<n2)return(-1); if(n1>n2)return(1); return(0); } /* Function to sort LOGDEL list in URL_ID order */ static int cmpurldellog(const void *s1,const void *s2){ unsigned int n1,n2; n1=((const UDM_LOGDEL*)s1)->url_id; n2=((const UDM_LOGDEL*)s2)->url_id; if(n1==n2){ n1=((const UDM_LOGDEL*)s2)->stamp; n2=((const UDM_LOGDEL*)s1)->stamp; } if(n1<n2)return(-1); if(n1>n2)return(1); return(0); } /* Function to sort CACHEWORD list in URL_ID order */ static int cmpcache(const void *s1,const void *s2){ int n1,n2; n1=((const UDM_LOGWORD*)s1)->wrd_id; n2=((const UDM_LOGWORD*)s2)->wrd_id; if(n1==n2){ n1=((const UDM_LOGWORD*)s1)->weight; n2=((const UDM_LOGWORD*)s2)->weight; if(n1==n2){ n1=((const UDM_LOGWORD*)s1)->url_id; n2=((const UDM_LOGWORD*)s2)->url_id; } } if(n1<n2)return(-1); if(n1>n2)return(1); return(0); } /* Sort tables by their length order */ static int cmptable(const void * t1, const void * t2){ return(((const UDM_CACHETABLE*)t2)->len-((const UDM_CACHETABLE*)t1)->len); } /******* These two functions open/close all 256+1 logs ***********/ int UdmOpenCache(UDM_ENV * Conf,char * errmsg){ if(!Conf->logd_addr){ char fname[BUFSIZ]; sprintf(fname,"%s/raw/%d.wrd",UDM_VAR_DIR,(int)time(NULL)); if((Conf->wrd_fd=open(fname,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){ sprintf(errmsg,"Can't open word log file: '%s'",strerror(errno)); return(IND_ERROR); } sprintf(fname,"%s/raw/%d.del",UDM_VAR_DIR,(int)time(NULL)); if((Conf->del_fd=open(fname,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){ sprintf(errmsg,"Can't open del log file: '%s'",strerror(errno)); return(IND_ERROR); } }else{ char * host,*prt; int port=UDM_LOGD_PORT; host=(Conf->logd_addr)?strdup(Conf->logd_addr):strdup("localhost"); if((prt=strchr(host,':'))){ *prt='\0'; if(!(port=atoi(prt+1))) port=UDM_LOGD_PORT; } if((Conf->logd_fd=open_host(host,port,60)) < 0){ sprintf(errmsg,"Can't connect to cachelogd at %s:%d",host,port); UDM_FREE(host); Conf->logd_fd=0; return(IND_ERROR); } UDM_FREE(host); } return(IND_OK); } void UdmCloseCache(UDM_ENV * Conf){ if(Conf->del_fd>0)close(Conf->del_fd); if(Conf->wrd_fd>0)close(Conf->wrd_fd); if(Conf->logd_fd>0)closesocket(Conf->logd_fd); } static int PresentInDelLog(UDM_LOGDEL *buf, int buf_count, const int url_id){ register int m; register int l; register int r; l=0; r=buf_count; while(l<r){ m=(l+r)/2; if((unsigned int)buf[m].url_id<(unsigned int)url_id) l=m+1; else r=m; } if(r==buf_count) return(0); else if(buf[r].url_id==url_id) return(buf[r].stamp); else return(0); } static int UdmDeleteFromCache(char *fname, UDM_LOGDEL *del_buf, int del_count){ int oldfd,newfd; UDM_CACHEHEADER header; UDM_CACHEHEADER newheader; UDM_CACHETABLE *table; UDM_CACHETABLE *newtable; UDM_CACHEWORD *cache; int i; size_t data_len; char tmpname[UDMSTRSIZ]=""; if((oldfd=open(fname,O_RDONLY|UDM_BINARY))>=0){ read(oldfd,&header,sizeof(header)); table=(UDM_CACHETABLE*)malloc(header.ntables*sizeof(UDM_CACHETABLE)); newtable=(UDM_CACHETABLE*)malloc(header.ntables*sizeof(UDM_CACHETABLE)); read(oldfd,table,header.ntables*sizeof(UDM_CACHETABLE)); data_len=0; for(i=0;i<header.ntables;i++)data_len+=table[i].len; cache=(UDM_CACHEWORD*)malloc(data_len); read(oldfd,cache,data_len); close(oldfd); /*printf("Delete from cache-file %s\n",fname);*/ for(i=0;i<header.ntables;i++){ int pos,len,j; int last,from; pos=table[i].pos/sizeof(UDM_CACHEWORD); len=table[i].len/sizeof(UDM_CACHEWORD); /* Make holes */ for(j=pos;j<pos+len;j++){ if(PresentInDelLog(del_buf,del_count,cache[j].url_id)){ cache[j].url_id=0; table[i].len-=sizeof(UDM_CACHEWORD); } } /* Turn off holes */ if(table[i].len>0){ j=pos; while((cache[j].url_id!=0)&&(j<pos+len))j++; last=j; while(j<pos+len){ while((cache[j].url_id==0)&&(j<pos+len))j++; from=j; while((cache[j].url_id!=0)&&(j<pos+len))j++; memcpy(&cache[last],&cache[from],(j-from)*sizeof(UDM_CACHEWORD)); last+=j-from; } } } /* Create new table */ data_len=0; newheader.ntables=0; for(i=0;i<header.ntables;i++){ if(table[i].len>0){ newtable[newheader.ntables].wrd_id=table[i].wrd_id; newtable[newheader.ntables].pos=data_len; newtable[newheader.ntables].len=table[i].len; data_len+=newtable[newheader.ntables].len; newheader.ntables++; } } if(newheader.ntables==0){ unlink(fname); }else{ /* Write new cache file */ sprintf(tmpname,"%s.tmp",fname); newfd=open(tmpname,O_WRONLY|O_CREAT|O_TRUNC|UDM_BINARY,UDM_IWRITE); write(newfd,&newheader,sizeof(newheader)); write(newfd,newtable,(size_t)newheader.ntables*sizeof(UDM_CACHETABLE)); write(newfd,cache,data_len); close(newfd); rename(tmpname,fname); } UDM_FREE(table); UDM_FREE(newtable); UDM_FREE(cache); return(0); }else{ return(1); } } /***** Write a marker that the old content of url_id should be deleted ****/ int UdmDeleteURLFromCache(UDM_AGENT * Indexer,int url_id){ int sent; if(!Indexer->Conf->logd_addr){ UDM_LOGDEL logdel; /* Write to del log */ logdel.stamp=time(NULL); logdel.url_id=url_id; sent=write(Indexer->Conf->del_fd,&logdel,sizeof(logdel)); if(sent!=sizeof(logdel)){ sprintf(UdmAgentErrorMsg(Indexer),"Can't write to del log: %s",strerror(errno)); return(IND_ERROR); } }else{ UDM_LOGD_CMD cmd; cmd.stamp=time(NULL); cmd.url_id=url_id; cmd.site_id=0; cmd.tag=0; cmd.category=0; cmd.cmd=0; cmd.nwords=0; sent=send(Indexer->Conf->logd_fd,&cmd,sizeof(cmd),0); if(sent!=sizeof(cmd)){ sprintf(UdmAgentErrorMsg(Indexer),"Can't write to logd: %s",strerror(errno)); return(IND_ERROR); } } return(IND_OK); } /******** Write words into 256+1 logs **************************/ int UdmStoreWordsCache(UDM_AGENT * Indexer,int url_id,int site_id,const char * catstr,const char * tagstr){ int category,tag,mask,sent; size_t i; UDM_LOGD_CMD cmd; /* Convert category & tag string into 32 bit number */ UdmDecodeCatStr(catstr,&category,&mask); UdmDecodeCatStr(tagstr,&tag,&mask); /* Mark that old content should be deleted */ /* Note that this should be done every time */ /* even if previous status=0, i.e. first time */ /* indexing */ cmd.stamp=time(NULL); cmd.url_id=url_id; cmd.site_id=site_id; cmd.tag=tag; cmd.category=category; cmd.cmd=0; cmd.nwords=Indexer->nwords; if(!Indexer->Conf->logd_addr){ UDM_LOGDEL logdel; UDM_LOGD_WRD *words; size_t nbytes; /* Write to del log */ logdel.stamp=cmd.stamp; logdel.url_id=cmd.url_id; sent=write(Indexer->Conf->del_fd,&logdel,sizeof(logdel)); if(sent!=sizeof(logdel)){ sprintf(UdmAgentErrorMsg(Indexer),"Can't write to del log: %s",strerror(errno)); return(IND_ERROR); } if(Indexer->nwords>0){ /* Write to word log */ sent=write(Indexer->Conf->wrd_fd,&cmd,sizeof(cmd)); if(sent!=sizeof(cmd)){ sprintf(UdmAgentErrorMsg(Indexer),"Can't write to word log: %s",strerror(errno)); return(IND_ERROR); } nbytes=Indexer->nwords*sizeof(UDM_LOGD_WRD); words=(UDM_LOGD_WRD*)malloc(nbytes); if(!words){ sprintf(UdmAgentErrorMsg(Indexer),"Can't allocate memory for word buf: %s",strerror(errno)); free(words); return(IND_ERROR); } for(i=0;i<Indexer->nwords;i++){ words[i].weight=Indexer->Word[i].count&0xFFFF; words[i].crc=UdmStrCRC32(Indexer->Word[i].word); #ifdef UDM_STORE_CACHE_WRDPOS words[i].wrd_pos=Indexer->Word[i].count>>16; #endif } sent=write(Indexer->Conf->wrd_fd,words,nbytes); if(sent!=nbytes){ sprintf(UdmAgentErrorMsg(Indexer),"Can't write to word log: %s",strerror(errno)); free(words); return(IND_ERROR); } free(words); } }else{ UDM_LOGD_WRD wrd; sent=send(Indexer->Conf->logd_fd,&cmd,sizeof(cmd),0); if(sent!=sizeof(cmd)){ sprintf(UdmAgentErrorMsg(Indexer),"Can't write to logd: %s",strerror(errno)); return(IND_ERROR); } for(i=0;i<Indexer->nwords;i++){ wrd.weight=Indexer->Word[i].count&0xFFFF; wrd.crc=UdmStrCRC32(Indexer->Word[i].word); #ifdef UDM_STORE_CACHE_WRDPOS wrd.wrd_pos=Indexer->Word[i].count>>16; #endif sent=send(Indexer->Conf->logd_fd,&wrd,sizeof(wrd),0); if(sent!=sizeof(wrd)){ sprintf(UdmAgentErrorMsg(Indexer),"Can't write to logd: %s",strerror(errno)); return(IND_ERROR); } } } return(IND_OK); } /***************** Split one log into cache *****************/ /* This function removes duplicate (wrd_id,url_id) pairs using timestamps. Most new pairs will remane. */ static int RemoveURLDelDups(UDM_LOGDEL *words, int n){ int i,j; j=0; for(i=1;i<n;i++){ if(words[j].url_id!=words[i].url_id){ j++; if(i!=j)words[j]=words[i]; } } return(j+1); } /* This function removes non-fresh words from UDM_LOGWORD array using timestamps from delete log. Only those (word_id,url_id) pairs which are newer than (url_id) from delete log will remane in "words" array */ static size_t RemoveOldWords(UDM_LOGWORD *words, size_t n,UDM_LOGDEL *del, int del_count){ size_t i,j; j=0; for(i=1;i<n;i++){ if((words[j].wrd_id!=words[i].wrd_id) ||(words[j].url_id!=words[i].url_id) #ifdef UDM_STORE_CACHE_WRDPOS ||(words[j].wrd_pos!=words[i].wrd_pos) #endif ){ if(PresentInDelLog(del,del_count,words[i].url_id)<=words[i].stamp){ j++; if(i!=j){ words[j]=words[i]; } } } } /* Check first word */ if(j==0){ if(PresentInDelLog(del,del_count,words[0].url_id)>words[0].stamp) return(0); } return(j+1); } int UdmClearCacheTree(){ int i,j,k; char fname[UDMSTRSIZ]; for(i=0;i<256;i++) for(j=0;j<16;j++) for(k=0;k<256;k++){ sprintf(fname,"%s%c%s%c%02X%c%01X%c%02X%01X%02X000",UDM_VAR_DIR,UDMSLASH,TREEDIR,UDMSLASH,i,UDMSLASH,j,UDMSLASH,i,j,k); printf("%s\n",fname); unlink(fname); } return(0); } int UdmSplitCacheLog(int log){ int fd,dd,bytes,i; size_t bufsize; char fname[UDMSTRSIZ]=""; char dname[UDMSTRSIZ]=""; char treedir[UDMSTRSIZ]=""; UDM_LOGWORD * buf; UDM_LOGDEL * del_buf; int del_count; int done[256]; struct stat sb; /* Build path to tree dir */ sprintf(treedir,"%s%c%s%c",UDM_VAR_DIR,UDMSLASH,TREEDIR,UDMSLASH); /* Open log file */ sprintf(fname,"%s%c%s%c%03X.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH,log); if((fd=open(fname,O_RDONLY|UDM_BINARY))<0){ return(IND_ERROR); } /* Open del log file */ sprintf(dname,"%s%c%s%cdel.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH); if((dd=open(dname,O_RDONLY|UDM_BINARY))<0){ return(IND_ERROR); } /* Allocate del buffer */ fstat(dd, &sb); del_buf=(UDM_LOGDEL*)malloc((size_t)sb.st_size); del_count=read(dd,del_buf,(size_t)sb.st_size)/sizeof(UDM_LOGDEL); close(dd); /* Remove duplicates URLs in DEL log */ /* Keep only oldest records for each URL */ qsort(del_buf,(size_t)del_count,sizeof(UDM_LOGDEL),cmpurldellog); del_count=RemoveURLDelDups(del_buf,del_count); /* Allocate buffer for log. FIXME!!! */ bufsize=sizeof(UDM_LOGWORD)*1024*1024*1; buf=(UDM_LOGWORD*)malloc(bufsize); /* Zero all Markers */ bzero(done,sizeof(done)); while((bytes=read(fd,buf,bufsize))){ int new,prev,first=0; size_t n; n=bytes/sizeof(UDM_LOGWORD); qsort(buf,n,sizeof(UDM_LOGWORD),cmplog); n=RemoveOldWords(buf,n,del_buf,del_count); prev=((unsigned int)buf[0].wrd_id); for(i=1;i<n+1;i++){ if(i<n)new=((unsigned int)buf[i].wrd_id); else new=prev+1; /* New cache file ? */ if((i==n)||((((unsigned int)prev)>>12)!=(((unsigned int)new)>>12))){ if(first!=i){ int newfd,oldfd; char scrc[16]; char dirname[UDMSTRSIZ]=""; char tmpname[UDMSTRSIZ]=""; int r,w,t,pos,count=0; UDM_CACHEHEADER header; UDM_CACHETABLE *table; size_t table_size; UDM_CACHEWORD cache[4096]; UDM_LOGWORD * logwords = NULL; int old_words=0; /* Build cache dir and file name */ sprintf(scrc,"%08X",((unsigned int)prev)&0xFFFFF000); sprintf(dirname,"%s%c%c%c%c%c",treedir,scrc[0],scrc[1],UDMSLASH,scrc[2],UDMSLASH); strcpy(tmpname,dirname); UdmBuild(tmpname,0755); sprintf(fname,"%s%s",dirname,scrc); /*printf("%s new: %08X prev: %08X first: %d i: %d\n",fname,new,prev,first,i);*/ done[(prev>>12)&0x000000FF]=1; /* Read OLD words from tree */ if((oldfd=open(fname,O_RDONLY|UDM_BINARY))>=0){ int j; /*printf("Read old: %s\n",fname);*/ read(oldfd,&header,sizeof(header)); table=(UDM_CACHETABLE*)malloc(header.ntables*sizeof(UDM_CACHETABLE)); read(oldfd,table,header.ntables*sizeof(UDM_CACHETABLE)); for(w=0;w<header.ntables;w++){ int c=0; int num=table[w].len/sizeof(UDM_CACHEWORD); while((r=(num-c))>0){ int add=0; if(r>4096)r=4096; read(oldfd,cache,r*sizeof(UDM_CACHEWORD)); if(!count)logwords=(UDM_LOGWORD*)malloc((count+r)*sizeof(UDM_LOGWORD)); else logwords=(UDM_LOGWORD*)realloc(logwords,(count+r)*sizeof(UDM_LOGWORD)); for(j=0;j<r;j++){ /* Add only those OLD words which */ /* do not present in DEL log */ if(!PresentInDelLog(del_buf,del_count,cache[j].url_id)){ logwords[count+j].wrd_id=table[w].wrd_id; logwords[count+j].weight=table[w].weight; logwords[count+j].url_id=cache[j].url_id; #ifdef UDM_STORE_CACHE_WRDPOS logwords[count+j].wrd_pos=cache[j].wrd_pos; #endif #ifdef UDM_STORE_CACHE_TAG logwords[count+j].tag=cache[j].tag; #endif #ifdef UDM_STORE_CACHE_SITEID logwords[count+j].site_id=cache[j].site_id; #endif #ifdef UDM_STORE_CACHE_CATEGORY logwords[count+j].category=cache[j].category; #endif add++; }else{ /*printf("URL %d were not added\n",cache[j].url_id);*/ } } c+=r; count+=add; old_words+=add; } } UDM_FREE(table); close(oldfd); } r=i-first; /* Merge OLD and NEW words */ if(!count)logwords=(UDM_LOGWORD*)malloc((count+r+1)*sizeof(UDM_LOGWORD)); else logwords=(UDM_LOGWORD*)realloc(logwords,(count+r+1)*sizeof(UDM_LOGWORD)); memcpy(logwords+count,buf+first,r*sizeof(UDM_LOGWORD)); count+=r; printf("%s old:%4d new:%4d total:%4d\n",fname,old_words,r,count); /* Sort them in (wrd_id,url_id) order */ qsort(logwords,(size_t)count,sizeof(UDM_LOGWORD),cmpcache); /* Create table */ header.version=0; header.ntables=0; pos=0; /* Add break at the end */ logwords[count].wrd_id=0; logwords[count].weight=0; table_size=4096; table=(UDM_CACHETABLE*)malloc(table_size*sizeof(UDM_CACHETABLE)); for(t=1;t<count+1;t++){ if((logwords[t-1].wrd_id!=logwords[t].wrd_id)|| (logwords[t-1].weight!=logwords[t].weight)){ table[header.ntables].wrd_id=logwords[t-1].wrd_id; table[header.ntables].weight=logwords[t-1].weight; table[header.ntables].pos=pos; table[header.ntables].len=t*sizeof(UDM_CACHEWORD)-pos; pos+=table[header.ntables].len; header.ntables++; if(header.ntables>=table_size){ table_size+=4096; table=(UDM_CACHETABLE*)realloc(table,table_size*sizeof(UDM_CACHETABLE)); } } } /* Write filtered and merged */ /* OLD and NEW words into tmp file */ sprintf(tmpname,"%s%c%c.tmp",dirname,scrc[3],scrc[4]); newfd=open(tmpname,O_WRONLY|O_CREAT|O_TRUNC|UDM_BINARY,UDM_IWRITE); write(newfd,&header,sizeof(header)); write(newfd,table,header.ntables*sizeof(UDM_CACHETABLE)); UDM_FREE(table); w=0; while(w<count){ int j; if((count-w)>4096)r=4096; else r=(count-w); for(j=0;j<r;j++){ cache[j].url_id=logwords[w+j].url_id; #ifdef UDM_STORE_CACHE_WRDPOS cache[j].wrd_pos=logwords[w+j].wrd_pos; #endif #ifdef UDM_STORE_CACHE_CATEGORY cache[j].category=logwords[w+j].category; #endif #ifdef UDM_STORE_CACHE_SITEID cache[j].site_id=logwords[w+j].site_id; #endif #ifdef UDM_STORE_CACHE_TAG cache[j].tag=logwords[w+j].tag; #endif } w+=r; write(newfd,cache,r*sizeof(UDM_CACHEWORD)); } close(newfd); UDM_FREE(logwords); /* rename tmp file into */ /* real name */ rename(tmpname,fname); } first=i; } prev=new; } } close(fd); UDM_FREE(buf); /* Remove words from */ /* undone files */ for(i=0;i<256;i++){ if(done[i]==0){ char n[20]; sprintf(n,"%03X",log); sprintf(fname,"%s%c%c%c%c%c%s%02X000",treedir,n[0],n[1],UDMSLASH,n[2],UDMSLASH,n,i); UdmDeleteFromCache(fname,del_buf,del_count); } } UDM_FREE(del_buf); return IND_OK; } /****************** Search stuff ************************************/ typedef struct m { int last; int cur; } UDM_MERG; typedef struct pm { UDM_SEARCHWORD * plast; UDM_SEARCHWORD * pcur; } UDM_PMERG; #define MAXMERGE 1024 UDM_SEARCHWORD * UdmFindCache(UDM_AGENT * Indexer,char *q){ int i; int nmerg=0; int min_num=0; int nwrd=0; int nwrd1=0; int catbase,catmask,tagbase,tagmask; UDM_SEARCHWORD * wrd = NULL; UDM_SEARCHWORD * wrd1 = NULL; UDM_MERG merg[MAXMERGE]; UDM_PMERG pmerg[MAXMERGE]; #ifdef DEBUG_SEARCH unsigned long ticks; #endif UdmPrepare(Indexer,q); UdmDecodeCatStr(Indexer->Conf->catlimit,&catbase,&catmask); UdmDecodeCatStr(Indexer->Conf->taglimit,&tagbase,&tagmask); #ifdef DEBUG_SEARCH fprintf(stderr,"Category: %08X/%08X\n",catbase,catmask); fprintf(stderr,"Tag : %08X/%08X\n",tagbase,tagmask); fprintf(stderr,"Site_id : %08X\n",Indexer->Conf->sitelimit); #endif /* url_list=UdmGetURLList(Indexer); */ #ifdef DEBUG_SEARCH fprintf(stderr,"Start read cache %d words\n",Indexer->words_in_query); ticks=UdmStartTimer(); #endif for(i=0;i<Indexer->words_in_query;i++){ int fd1,t; char scrc[16]; char dname[UDMSTRSIZ]; char fname[UDMSTRSIZ]; UDM_CACHEHEADER header; UDM_CACHETABLE table[8192]; UDM_CACHEWORD * cw = NULL; int data_ofs; int group_num=0; int group_first=0; int group; sprintf(scrc,"%08X",Indexer->cwords[i]&0xFFFFF000); sprintf(dname,"%s%c%s%c%c%c%c%c%c", UDM_VAR_DIR,UDMSLASH,TREEDIR,UDMSLASH, scrc[0],scrc[1],UDMSLASH,scrc[2],UDMSLASH); #ifdef SHORTNAME sprintf(fname,"%s%c%c",dname,scrc[3],scrc[4]); #else sprintf(fname,"%s%s",dname,scrc); #endif if((fd1=open(fname,O_RDONLY|UDM_BINARY))<0){ #ifdef DEBUG_SEARCH fprintf(stderr,"\tCan't open: %s\n",fname); #endif continue; } #ifdef DEBUG_SEARCH fprintf(stderr,"\tOpened: %s\n",fname); #endif read(fd1,&header,sizeof(header)); data_ofs=sizeof(header)+sizeof(UDM_CACHETABLE)*header.ntables; read(fd1,table,header.ntables*sizeof(UDM_CACHETABLE)); qsort(&table,(size_t)header.ntables,sizeof(UDM_CACHETABLE),cmptable); group_first=nwrd; group=0; for(t=0;t<header.ntables;t++){ int bytes,fweight=0; /* Build weight of doc section */ if(Indexer->weight_factor){ int f; for(f=0;f<8;f++) fweight+=(((table[t].weight>>f)&0x01)*Indexer->wf[f]); }else{ fweight=table[t].weight; } if((Indexer->cwords[i]==table[t].wrd_id)&&(fweight)){ int j,num,real_num=0; cw=(UDM_CACHEWORD*)UdmXmalloc((size_t)table[t].len); lseek(fd1,(off_t)data_ofs+table[t].pos,SEEK_SET); bytes=read(fd1,cw,(size_t)table[t].len); if(bytes==-1)fprintf(stderr,"Error reading cache file %s\n",fname); num=bytes/sizeof(UDM_CACHEWORD); if(!nwrd)wrd=(UDM_SEARCHWORD*)UdmXmalloc(num*sizeof(UDM_SEARCHWORD)); else wrd=(UDM_SEARCHWORD*)UdmXrealloc(wrd,(num+nwrd)*sizeof(UDM_SEARCHWORD)); for(j=0;j<num;j++){ register int add=1; /* Check category,tag and site_id match */ #ifdef UDM_STORE_CACHE_CATEGORY add=((cw[j].category&catmask)==catbase); #endif #ifdef UDM_STORE_CACHE_TAG add=add&&((cw[j].tag&tagmask)==tagbase); #endif #ifdef UDM_STORE_CACHE_SITEID add=add&&(Indexer->Conf->sitelimit?(cw[j].site_id==Indexer->Conf->sitelimit):1); #endif /* Add if passed */ if(add){ wrd[nwrd+real_num].url_id=cw[j].url_id; wrd[nwrd+real_num].count=1<<Indexer->wordorders[i]; wrd[nwrd+real_num].weight=fweight; #ifdef UDM_STORE_CACHE_WRDPOS wrd[nwrd+real_num].pos=cw[j].wrd_pos; #endif real_num++; } } if(real_num>0){ if(group<2){ merg[nmerg].cur=nwrd; merg[nmerg].last=nwrd+real_num; nmerg++; } group_num+=real_num; nwrd+=real_num; group++; } } } close(fd1); if((group==0)&&(Indexer->search_mode==UDM_MODE_ALL)){ Indexer->total_found=0; return(NULL); } sprintf(UDM_STREND(Indexer->wordinfo)," %s: %d",Indexer->words[i],group_num); if(nmerg){ merg[nmerg-1].last=nwrd; qsort(wrd+merg[nmerg-1].cur, (size_t)merg[nmerg-1].last-merg[nmerg-1].cur, sizeof(UDM_SEARCHWORD),cmpurlid); } } #ifdef DEBUG_SEARCH ticks=UdmStartTimer() - ticks; fprintf(stderr,"Stop read cache:\t%.2f\n",(float)ticks/1000); fprintf(stderr,"Start merge %d groups, %d words\n",nmerg,nwrd); ticks=UdmStartTimer(); fprintf(stderr,"Allocate buffer for merged words\n"); #endif if(nwrd){ /* Allocate memory for merged words */ wrd1=(UDM_SEARCHWORD*)UdmXmalloc(nwrd*sizeof(UDM_SEARCHWORD)); /* Fill pointers array */ for(i=0;i<nmerg;i++){ pmerg[i].pcur=wrd+merg[i].cur; pmerg[i].plast=wrd+merg[i].last; } } while(nmerg&&nwrd){ register int k, min_val=0x7FFFFFFF; register UDM_PMERG * p; if(nmerg==1){ int num; num=pmerg[0].plast-pmerg[0].pcur; memcpy(&wrd1[nwrd1],pmerg[0].pcur,num*sizeof(UDM_SEARCHWORD)); nwrd1+=num; break; }else{ for(k=0;k<nmerg;k++){ if(min_val>(pmerg[k].pcur)->url_id){ min_val=(pmerg[k].pcur)->url_id; min_num=k; } } p=&pmerg[min_num]; memcpy(&wrd1[nwrd1],p->pcur,sizeof(UDM_SEARCHWORD)); (p->pcur)++; if(p->pcur>=p->plast){ nmerg--; if(min_num<nmerg){ memmove(p,p+1,sizeof(UDM_PMERG)*(nmerg-min_num)); } } nwrd1++; } } UDM_FREE(wrd); #ifdef DEBUG_SEARCH ticks=UdmStartTimer() - ticks; fprintf(stderr,"Stop merge groups:\t%.2f nwrd=%d nwrd1=%d\n",(float)ticks/1000,nwrd,nwrd1); #endif if((Indexer->total_found=nwrd1)){ /* Sort in URL order */ if(Indexer->Conf->use_phrases) UdmSortSearchWordsByURL(wrd1,Indexer->total_found); UdmGroupByURL(Indexer,wrd1); } return(wrd1); } /**********************************************/ static int cmplogname(const void * s1,const void * s2){ int n1=atoi((const char*)s1); int n2=atoi((const char*)s2); if(n1<n2)return(-1); else if(n1>n2)return(1); else return(0); } #define LOGNO(x) (((unsigned int)(x))>>20) static int write_cache(UDM_LOGWORD * wrd, int nwrd){ int i,fd,first=0; char name[1024]; if(!nwrd)return(0); /* Sort in word_id order */ qsort(wrd,(size_t)nwrd,sizeof(UDM_LOGWORD),cmplog); /* Barier for last words portion*/ wrd[nwrd].wrd_id=wrd[nwrd-1].wrd_id+0x00100000; for(i=1;i<=nwrd;i++){ if(LOGNO(wrd[i].wrd_id)!=LOGNO(wrd[i-1].wrd_id)){ sprintf(name,"%s%c%s%c%03X.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH,LOGNO(wrd[first].wrd_id)); if((fd=open(name,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){ fprintf(stderr,"open('%s') error: %s\n",name,strerror(errno)); }else{ int nbytes; nbytes=(i-first)*sizeof(UDM_LOGWORD); if(nbytes!=write(fd,wrd+first,(size_t)nbytes)){ fprintf(stderr,"write('%s') error: %s\n",name,strerror(errno)); } close(fd); } first=i; } } return 0; } int UdmPreSplitCacheLog(){ char name[BUFSIZ]; DIR * dirp; struct dirent * dp; char ** logs; int nlogs,i,nwrd,fd; struct stat sb; size_t del_size=0; UDM_LOGWORD * wrd; int wrd_count; UDM_LOGDEL * del; UDM_LOGD_CMD cmd; UDM_LOGD_WRD *log_wrd; sprintf(name,"%s%c%s",UDM_VAR_DIR,UDMSLASH,LOGDIR); printf("Open dir '%s'\n",name); nlogs=0; logs=(char**)malloc(sizeof(char*)); dirp = opendir(name); while ((dp = readdir(dirp)) != NULL){ if((strlen(dp->d_name)>4)&&(!strcmp(UDM_STREND(dp->d_name)-4,".wrd"))){ nlogs++; logs=(char**)realloc(logs,nlogs*sizeof(char*)); logs[nlogs-1]=strdup(dp->d_name); logs[nlogs-1][strlen(logs[nlogs-1])-4]='\0'; } } closedir(dirp); qsort(logs,(size_t)nlogs,sizeof(char*),&cmplogname); for(i=0;i<nlogs;i++){ sprintf(name,"%s%c%s%c%s.wrd",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]); printf("Preparing word log %s\n",logs[i]); #define NWRD 3000000 /* nwrd+1 for barier */ if((wrd=(UDM_LOGWORD*)malloc((NWRD+1)*sizeof(UDM_LOGWORD)))){ wrd_count=0; if((fd=open(name,O_RDONLY|UDM_BINARY))<0){ fprintf(stderr,"open('%s') error: %s\n",name,strerror(errno)); }else{ int nbytes; /* Old format while((nbytes=read(fd,wrd,NWRD*sizeof(UDM_LOGWORD)))){ nwrd=nbytes/sizeof(UDM_LOGWORD); printf("Read %d bytes %d words\n",nbytes,nwrd); write_cache(wrd,nwrd); } */ /* Read next document header */ while((nbytes=read(fd,&cmd,sizeof(UDM_LOGD_CMD)))){ nwrd=cmd.nwords; /* printf("Read %d bytes %d words\n",nbytes,nwrd); */ /* if free space is less then document size */ if (NWRD-wrd_count<nwrd){ /* write logs */ write_cache(wrd,wrd_count); wrd_count=0; } /* Read words */ nbytes=nwrd*sizeof(UDM_LOGD_WRD); log_wrd=(UDM_LOGD_WRD*)malloc((size_t)nbytes); if((nbytes!=read(fd,log_wrd,(size_t)nbytes))){ fprintf(stderr,"Read word log error: %s\n",strerror(errno)); }else{ int j; /* Store words in wrd buffer */ for(j=0;j<nwrd;j++){ wrd[wrd_count+j].stamp=cmd.stamp; wrd[wrd_count+j].url_id=cmd.url_id; #ifdef UDM_STORE_CACHE_SITEID wrd[wrd_count+j].site_id=cmd.site_id; #endif #ifdef UDM_STORE_CACHE_TAG wrd[wrd_count+j].tag=cmd.tag; #endif #ifdef UDM_STORE_CACHE_CATEGORY wrd[wrd_count+j].category=cmd.category; #endif wrd[wrd_count+j].wrd_id=log_wrd[j].crc; wrd[wrd_count+j].weight=log_wrd[j].weight; #ifdef UDM_STORE_CACHE_WRDPOS wrd[wrd_count+j].wrd_pos=log_wrd[j].wrd_pos; #endif } wrd_count+=nwrd; } UDM_FREE(log_wrd); } if (wrd_count>0){ write_cache(wrd,wrd_count); wrd_count=0; } close(fd); } UDM_FREE(wrd); }else{ fprintf(stderr,"Malloc error: %s\n",strerror(errno)); } } /* FIXME: add checking */ del=(UDM_LOGDEL*)malloc(sizeof(UDM_LOGDEL)); for(i=0;i<nlogs;i++){ sprintf(name,"%s%c%s%c%s.del",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]); printf("Preparing del log %s\n",logs[i]); if(!stat(name,&sb)){ del_size+=sb.st_size; del=(UDM_LOGDEL*)realloc(del,del_size); if((fd=open(name,O_RDONLY|UDM_BINARY))<0){ fprintf(stderr,"open('%s') error: %s\n",name,strerror(errno)); }else{ if(sb.st_size!=read(fd,del+(del_size-sb.st_size)/sizeof(UDM_LOGDEL),(size_t)sb.st_size)){ fprintf(stderr,"read('%s') error: %s\n",name,strerror(errno)); } close(fd); } }else{ fprintf(stderr,"stat('%s') error: %s\n",name,strerror(errno)); } } sprintf(name,"%s%c%s%cdel.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH); if((fd=open(name,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){ fprintf(stderr,"open('%s') error: %s\n",name,strerror(errno)); }else{ if(del_size!=write(fd,del,del_size)){ fprintf(stderr,"write('%s') error: %s\n",name,strerror(errno)); } close(fd); } UDM_FREE(del); /************************************************************* printf("Deleting raw logs\n"); for(i=0;i<nlogs;i++){ sprintf(name,"%s%c%s%c%s.wrd",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]); unlink(name); sprintf(name,"%s%c%s%c%s.del",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]); unlink(name); } *************************************************************/ UDM_FREE(logs); return 0; }