Hello!

We finally found a bug in cache.c. New version is in attachement.
Everybody who has problems with splitter's crashes are welcome to test. 
Please, give feedback!
#include "udm_config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <dirent.h>

#include "udm_common.h"
#include "udm_utils.h"
#include "udm_charset.h"
#include "udm_spell.h"
#include "udm_cache.h"
#include "udm_crc32.h"
#include "udm_indexer.h"
#include "udm_db.h"
#include "udm_boolean.h"
#include "udm_searchtool.h"
#include "udm_agent.h"
#include "udm_xmalloc.h"
#include "udm_stopwords.h"
#include "udm_proto.h"

#ifdef HAVE_WINSOCK_H
#include <winsock.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif

#ifndef INADDR_NONE
#define INADDR_NONE ((unsigned long) -1)
#endif



#define DEBUG_SEARCH 1


static int open_host(char *hostname,int port, int timeout)
{
        int net;
        struct hostent *host;
        struct sockaddr_in sa_in;

        bzero((char*)&sa_in,sizeof(sa_in));

        if (port){
                sa_in.sin_port= htons((u_short)port);
        }else{
                return(UDM_NET_ERROR);
        }

        if ((sa_in.sin_addr.s_addr=inet_addr(hostname)) != INADDR_NONE){
                sa_in.sin_family=AF_INET;
        }else{
                host=gethostbyname(hostname);
                if (host){
                        sa_in.sin_family=host->h_addrtype;
                        memcpy(&sa_in.sin_addr, host->h_addr, (size_t)host->h_length);
                }else{
                        return(UDM_NET_CANT_RESOLVE);
                }
        }
        net=socket(AF_INET, SOCK_STREAM, 0);

        if(connect(net, (struct sockaddr *)&sa_in, sizeof (sa_in)))
                return(UDM_NET_CANT_CONNECT);

        return(net);
}


/***********************************/

#define LOGDIR  "raw"
#define TREEDIR "tree"
#define SPLDIR  "splitter"

/**********************************/


typedef struct udm_cache_table {
        int wrd_id;
        int weight;
        int pos;
        int len;
} UDM_CACHETABLE;

typedef struct udm_cache_hheader {
        int ntables;
        int version;
} UDM_CACHEHEADER;

typedef struct udm_cacheword_struct {
        int     url_id;
#ifdef UDM_STORE_CACHE_WRDPOS
        int     wrd_pos;
#endif
#ifdef UDM_STORE_CACHE_SITEID
        int     site_id;
#endif  
#ifdef UDM_STORE_CACHE_CATEGORY
        int     category;
#endif
#ifdef UDM_STORE_CACHE_TAG
        int     tag;
#endif
} UDM_CACHEWORD;


/******** Convert category string into 32 bit number *************/
static void UdmDecodeCatStr(const char * cat_str, int * cat, int * mask){
unsigned int t[5];
char str[128]="";

        strcpy(str,cat_str);
        strcat(str,"000000000000");
        str[10]=0;
        sscanf(str,"%02x%02x%02x%02x%02x",t+0,t+1,t+2,t+3,t+4);

        *mask= (int)( (t[0]?0x7F<<25:0) | (t[1]?0x7F<<18:0) | 
                (t[2]?0x3F<<12:0) | (t[3]?0x3F<<6:0) | (t[4]?0x3F<<0:0));

        *cat= (int)((t[0]<<25) | (t[1]<<18) | (t[2]<<12) | (t[3]<<6) | (t[4]<<0));
}

/*************************** Sort functions **************************/
/* Sort SEARCHWORD by url_id order */
static int cmpurlid(const void *s1,const void *s2){
        return(((const UDM_SEARCHWORD*)s1)->url_id-((const 
UDM_SEARCHWORD*)s2)->url_id);
}

/* Function to sort LOGWORD list in (WRD_ID,TIME_STAMP) order */
static int cmplog(const void *s1,const void *s2){
unsigned int n1,n2;
        n1=((const UDM_LOGWORD*)s1)->wrd_id;
        n2=((const UDM_LOGWORD*)s2)->wrd_id;
        if(n1==n2){
                n1=((const UDM_LOGWORD*)s1)->url_id;
                n2=((const UDM_LOGWORD*)s2)->url_id;
                if(n1==n2){
                        n1=((const UDM_LOGWORD*)s2)->stamp;
                        n2=((const UDM_LOGWORD*)s1)->stamp;
                }
        }
        if(n1<n2)return(-1);
        if(n1>n2)return(1);
        return(0);
}

/* Function to sort LOGDEL list in URL_ID order */
static int cmpurldellog(const void *s1,const void *s2){
unsigned int n1,n2;
        n1=((const UDM_LOGDEL*)s1)->url_id;
        n2=((const UDM_LOGDEL*)s2)->url_id;
        if(n1==n2){
                n1=((const UDM_LOGDEL*)s2)->stamp;
                n2=((const UDM_LOGDEL*)s1)->stamp;
        }
        if(n1<n2)return(-1);
        if(n1>n2)return(1);
        return(0);
}

/* Function to sort CACHEWORD list in URL_ID order */
static int cmpcache(const void *s1,const void *s2){
int n1,n2;
        n1=((const UDM_LOGWORD*)s1)->wrd_id;
        n2=((const UDM_LOGWORD*)s2)->wrd_id;
        if(n1==n2){
                n1=((const UDM_LOGWORD*)s1)->weight;
                n2=((const UDM_LOGWORD*)s2)->weight;
                if(n1==n2){
                        n1=((const UDM_LOGWORD*)s1)->url_id;
                        n2=((const UDM_LOGWORD*)s2)->url_id;
                }
        }
        if(n1<n2)return(-1);
        if(n1>n2)return(1);
        return(0);
}

/* Sort tables by their length order */
static int cmptable(const void * t1, const void * t2){
        return(((const UDM_CACHETABLE*)t2)->len-((const UDM_CACHETABLE*)t1)->len);
}

/******* These two functions open/close all 256+1 logs ***********/

int UdmOpenCache(UDM_ENV * Conf,char * errmsg){
        if(!Conf->logd_addr){
                char fname[BUFSIZ];
        
                sprintf(fname,"%s/raw/%d.wrd",UDM_VAR_DIR,(int)time(NULL));
                
if((Conf->wrd_fd=open(fname,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){
                        sprintf(errmsg,"Can't open word log file: 
'%s'",strerror(errno));
                        return(IND_ERROR);
                }
                sprintf(fname,"%s/raw/%d.del",UDM_VAR_DIR,(int)time(NULL));
                
if((Conf->del_fd=open(fname,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){
                        sprintf(errmsg,"Can't open del log file: 
'%s'",strerror(errno));
                        return(IND_ERROR);
                }
        }else{
                char * host,*prt;
                int port=UDM_LOGD_PORT;
                host=(Conf->logd_addr)?strdup(Conf->logd_addr):strdup("localhost");
                if((prt=strchr(host,':'))){
                        *prt='\0';
                        if(!(port=atoi(prt+1)))
                                port=UDM_LOGD_PORT;
                }
                if((Conf->logd_fd=open_host(host,port,60)) < 0){
                        sprintf(errmsg,"Can't connect to cachelogd at 
%s:%d",host,port);
                        UDM_FREE(host);
                        Conf->logd_fd=0;
                        return(IND_ERROR);
                }
                UDM_FREE(host);
        }
        return(IND_OK);
}

void UdmCloseCache(UDM_ENV * Conf){
        if(Conf->del_fd>0)close(Conf->del_fd);
        if(Conf->wrd_fd>0)close(Conf->wrd_fd);
        if(Conf->logd_fd>0)closesocket(Conf->logd_fd);
}


static int PresentInDelLog(UDM_LOGDEL *buf, int buf_count, const int url_id){
register int m;
register int l;
register int r;

        l=0;
        r=buf_count;
        while(l<r){
                m=(l+r)/2;
                if((unsigned int)buf[m].url_id<(unsigned int)url_id) l=m+1;
                else r=m;
        }
        if(r==buf_count) return(0);
        else
                if(buf[r].url_id==url_id) return(buf[r].stamp);
                else return(0);
}


static int UdmDeleteFromCache(char *fname, UDM_LOGDEL *del_buf, int del_count){
        int oldfd,newfd;
        UDM_CACHEHEADER header;
        UDM_CACHEHEADER newheader;
        UDM_CACHETABLE  *table;
        UDM_CACHETABLE  *newtable;
        UDM_CACHEWORD   *cache;
        int i;
        size_t data_len;
        char tmpname[UDMSTRSIZ]="";

        if((oldfd=open(fname,O_RDONLY|UDM_BINARY))>=0){
                read(oldfd,&header,sizeof(header));
                table=(UDM_CACHETABLE*)malloc(header.ntables*sizeof(UDM_CACHETABLE));
                
newtable=(UDM_CACHETABLE*)malloc(header.ntables*sizeof(UDM_CACHETABLE));
                read(oldfd,table,header.ntables*sizeof(UDM_CACHETABLE));
                data_len=0;
                for(i=0;i<header.ntables;i++)data_len+=table[i].len;
                cache=(UDM_CACHEWORD*)malloc(data_len);
                read(oldfd,cache,data_len);
                close(oldfd);

                /*printf("Delete from cache-file %s\n",fname);*/

                for(i=0;i<header.ntables;i++){
                        int pos,len,j;
                        int last,from;

                        pos=table[i].pos/sizeof(UDM_CACHEWORD);
                        len=table[i].len/sizeof(UDM_CACHEWORD);

                        /* Make holes */
                        for(j=pos;j<pos+len;j++){
                                if(PresentInDelLog(del_buf,del_count,cache[j].url_id)){
                                        cache[j].url_id=0;
                                        table[i].len-=sizeof(UDM_CACHEWORD);
                                }
                        }

                        /* Turn off holes */
                        if(table[i].len>0){
                                j=pos;
                                while((cache[j].url_id!=0)&&(j<pos+len))j++;
                                last=j;
                                while(j<pos+len){
                                        while((cache[j].url_id==0)&&(j<pos+len))j++;
                                        from=j;
                                        while((cache[j].url_id!=0)&&(j<pos+len))j++;
                                        
memcpy(&cache[last],&cache[from],(j-from)*sizeof(UDM_CACHEWORD));
                                        last+=j-from; 
                                }
                        }
                }
                

                /* Create new table */
                data_len=0;
                newheader.ntables=0;
                for(i=0;i<header.ntables;i++){
                        if(table[i].len>0){
                                newtable[newheader.ntables].wrd_id=table[i].wrd_id;
                                newtable[newheader.ntables].pos=data_len;
                                newtable[newheader.ntables].len=table[i].len;
                                data_len+=newtable[newheader.ntables].len;
                                newheader.ntables++;
                        }
                }

                if(newheader.ntables==0){
                        unlink(fname);
                }else{
                        /* Write new cache file */      
                        sprintf(tmpname,"%s.tmp",fname);
                        
newfd=open(tmpname,O_WRONLY|O_CREAT|O_TRUNC|UDM_BINARY,UDM_IWRITE);
                        write(newfd,&newheader,sizeof(newheader));
                        
write(newfd,newtable,(size_t)newheader.ntables*sizeof(UDM_CACHETABLE));
                        write(newfd,cache,data_len);
                        close(newfd);
                        rename(tmpname,fname);
                }
                UDM_FREE(table);
                UDM_FREE(newtable);
                UDM_FREE(cache);

                return(0);
        }else{
                return(1);
        }
}


/***** Write a marker that the old content of url_id should be deleted ****/
int UdmDeleteURLFromCache(UDM_AGENT * Indexer,int url_id){
        int sent;

        if(!Indexer->Conf->logd_addr){
                UDM_LOGDEL logdel;

                /* Write to del log */
                logdel.stamp=time(NULL);
                logdel.url_id=url_id;
                sent=write(Indexer->Conf->del_fd,&logdel,sizeof(logdel));

                if(sent!=sizeof(logdel)){
                        sprintf(UdmAgentErrorMsg(Indexer),"Can't write to del log: 
%s",strerror(errno));
                        return(IND_ERROR);
                }
        }else{  
                UDM_LOGD_CMD cmd;

                cmd.stamp=time(NULL);
                cmd.url_id=url_id;
                cmd.site_id=0;
                cmd.tag=0;
                cmd.category=0;
                cmd.cmd=0;
                cmd.nwords=0;

                sent=send(Indexer->Conf->logd_fd,&cmd,sizeof(cmd),0);
                if(sent!=sizeof(cmd)){
                        sprintf(UdmAgentErrorMsg(Indexer),"Can't write to logd: 
%s",strerror(errno));
                        return(IND_ERROR);
                }
        }
        return(IND_OK);
}


/******** Write words into 256+1 logs **************************/
int UdmStoreWordsCache(UDM_AGENT * Indexer,int url_id,int site_id,const char * 
catstr,const char * tagstr){
        int category,tag,mask,sent;
        size_t i;
        UDM_LOGD_CMD cmd;

        /* Convert category & tag string into 32 bit number */
        UdmDecodeCatStr(catstr,&category,&mask);
        UdmDecodeCatStr(tagstr,&tag,&mask);

        /* Mark that old content should be deleted    */
        /* Note that this should be done every time   */
        /* even if previous status=0, i.e. first time */
        /* indexing                                   */

        cmd.stamp=time(NULL);
        cmd.url_id=url_id;
        cmd.site_id=site_id;
        cmd.tag=tag;
        cmd.category=category;

        cmd.cmd=0;
        cmd.nwords=Indexer->nwords;
        if(!Indexer->Conf->logd_addr){
                UDM_LOGDEL logdel;
                UDM_LOGD_WRD *words;
                size_t nbytes;
        
                /* Write to del log */
                logdel.stamp=cmd.stamp;
                logdel.url_id=cmd.url_id;
                sent=write(Indexer->Conf->del_fd,&logdel,sizeof(logdel));
                if(sent!=sizeof(logdel)){
                        sprintf(UdmAgentErrorMsg(Indexer),"Can't write to del log: 
%s",strerror(errno));
                        return(IND_ERROR);
                }
                if(Indexer->nwords>0){
                        /* Write to word log */
                        sent=write(Indexer->Conf->wrd_fd,&cmd,sizeof(cmd));
                        if(sent!=sizeof(cmd)){
                                sprintf(UdmAgentErrorMsg(Indexer),"Can't write to word 
log: %s",strerror(errno));
                                return(IND_ERROR);
                        }
                        nbytes=Indexer->nwords*sizeof(UDM_LOGD_WRD);
                        words=(UDM_LOGD_WRD*)malloc(nbytes);
                        if(!words){
                                sprintf(UdmAgentErrorMsg(Indexer),"Can't allocate 
memory for word buf: %s",strerror(errno));
                                free(words);
                                return(IND_ERROR);
                        }
                        for(i=0;i<Indexer->nwords;i++){
                                words[i].weight=Indexer->Word[i].count&0xFFFF;
                                words[i].crc=UdmStrCRC32(Indexer->Word[i].word);
                                #ifdef UDM_STORE_CACHE_WRDPOS
                                words[i].wrd_pos=Indexer->Word[i].count>>16;
                                #endif
                        }
                        sent=write(Indexer->Conf->wrd_fd,words,nbytes);
                        if(sent!=nbytes){
                                sprintf(UdmAgentErrorMsg(Indexer),"Can't write to word 
log: %s",strerror(errno));
                                free(words);
                                return(IND_ERROR);
                        }
                        free(words);
                }
        }else{
                UDM_LOGD_WRD wrd;
        
                sent=send(Indexer->Conf->logd_fd,&cmd,sizeof(cmd),0);
                if(sent!=sizeof(cmd)){
                        sprintf(UdmAgentErrorMsg(Indexer),"Can't write to logd: 
%s",strerror(errno));
                        return(IND_ERROR);
                }
                for(i=0;i<Indexer->nwords;i++){
                        wrd.weight=Indexer->Word[i].count&0xFFFF;
                        wrd.crc=UdmStrCRC32(Indexer->Word[i].word);
                        #ifdef UDM_STORE_CACHE_WRDPOS
                        wrd.wrd_pos=Indexer->Word[i].count>>16;
                        #endif
                        sent=send(Indexer->Conf->logd_fd,&wrd,sizeof(wrd),0);
                        if(sent!=sizeof(wrd)){
                                sprintf(UdmAgentErrorMsg(Indexer),"Can't write to 
logd: %s",strerror(errno));
                                return(IND_ERROR);
                        }
                }
        }
        return(IND_OK);
}

/***************** Split one log into cache *****************/

/* 
This function removes duplicate
(wrd_id,url_id) pairs using timestamps.
Most new pairs will remane.
*/

static int RemoveURLDelDups(UDM_LOGDEL *words, int n){
        int i,j;
        
        j=0;
        for(i=1;i<n;i++){
                if(words[j].url_id!=words[i].url_id){
                        j++;
                        if(i!=j)words[j]=words[i];
                }
        }
        return(j+1);
}

/* 
This function removes non-fresh  
words from UDM_LOGWORD array using timestamps
from delete log. Only those (word_id,url_id)
pairs which are newer than (url_id) from 
delete log will remane in "words" array 
*/

static size_t RemoveOldWords(UDM_LOGWORD *words, size_t n,UDM_LOGDEL *del, int 
del_count){
        size_t i,j;
        
        j=0;
        for(i=1;i<n;i++){
                if((words[j].wrd_id!=words[i].wrd_id)
                   ||(words[j].url_id!=words[i].url_id)
#ifdef UDM_STORE_CACHE_WRDPOS
                   ||(words[j].wrd_pos!=words[i].wrd_pos)
#endif
                ){
                        
if(PresentInDelLog(del,del_count,words[i].url_id)<=words[i].stamp){
                                j++;
                                if(i!=j){
                                        words[j]=words[i];
                                }
                        }
                }
        }
        /* Check first word */
        if(j==0){
                if(PresentInDelLog(del,del_count,words[0].url_id)>words[0].stamp)
                        return(0);
        }
        return(j+1);
}

int UdmClearCacheTree(){
int i,j,k;
char fname[UDMSTRSIZ];
        for(i=0;i<256;i++)
        for(j=0;j<16;j++)
        for(k=0;k<256;k++){
                
sprintf(fname,"%s%c%s%c%02X%c%01X%c%02X%01X%02X000",UDM_VAR_DIR,UDMSLASH,TREEDIR,UDMSLASH,i,UDMSLASH,j,UDMSLASH,i,j,k);
                printf("%s\n",fname);
                unlink(fname);
        }
        return(0);
}


int UdmSplitCacheLog(int log){
        int fd,dd,bytes,i;
        size_t bufsize;
        char fname[UDMSTRSIZ]="";
        char dname[UDMSTRSIZ]="";
        char treedir[UDMSTRSIZ]="";
        UDM_LOGWORD * buf;
        UDM_LOGDEL * del_buf;
        int del_count;
        int done[256];
        struct stat sb;

        /* Build path to tree dir */
        sprintf(treedir,"%s%c%s%c",UDM_VAR_DIR,UDMSLASH,TREEDIR,UDMSLASH);

        /* Open log file */
        sprintf(fname,"%s%c%s%c%03X.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH,log);
        if((fd=open(fname,O_RDONLY|UDM_BINARY))<0){
                return(IND_ERROR);
        }
        
        /* Open del log file */
        sprintf(dname,"%s%c%s%cdel.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH);
        if((dd=open(dname,O_RDONLY|UDM_BINARY))<0){
                return(IND_ERROR);
        }

        /* Allocate del buffer */
        fstat(dd, &sb);
        del_buf=(UDM_LOGDEL*)malloc((size_t)sb.st_size);
        del_count=read(dd,del_buf,(size_t)sb.st_size)/sizeof(UDM_LOGDEL);
        close(dd);

        /* Remove duplicates URLs in DEL log     */
        /* Keep only oldest records for each URL */
        qsort(del_buf,(size_t)del_count,sizeof(UDM_LOGDEL),cmpurldellog);
        del_count=RemoveURLDelDups(del_buf,del_count);

        /* Allocate buffer for log. FIXME!!! */
        bufsize=sizeof(UDM_LOGWORD)*1024*1024*1; 
        buf=(UDM_LOGWORD*)malloc(bufsize);

        /* Zero all Markers */
        bzero(done,sizeof(done));

        while((bytes=read(fd,buf,bufsize))){
                int new,prev,first=0;
                size_t n;
                n=bytes/sizeof(UDM_LOGWORD);
                qsort(buf,n,sizeof(UDM_LOGWORD),cmplog);
                n=RemoveOldWords(buf,n,del_buf,del_count);
                prev=((unsigned int)buf[0].wrd_id);

                for(i=1;i<n+1;i++){
                        if(i<n)new=((unsigned int)buf[i].wrd_id);
                        else    new=prev+1;

                        /* New cache file ? */
                        if((i==n)||((((unsigned int)prev)>>12)!=(((unsigned 
int)new)>>12))){
                                if(first!=i){
                                        int newfd,oldfd;
                                        char scrc[16];
                                        char dirname[UDMSTRSIZ]="";
                                        char tmpname[UDMSTRSIZ]="";
                                        int r,w,t,pos,count=0;
                                        UDM_CACHEHEADER header;
                                        UDM_CACHETABLE *table;
                                        size_t table_size;
                                        UDM_CACHEWORD cache[4096];
                                        UDM_LOGWORD    * logwords = NULL;
                                        int old_words=0;
                                        
                                        /* Build cache dir and file name */
                                        sprintf(scrc,"%08X",((unsigned 
int)prev)&0xFFFFF000);
                                        
sprintf(dirname,"%s%c%c%c%c%c",treedir,scrc[0],scrc[1],UDMSLASH,scrc[2],UDMSLASH);
                                        strcpy(tmpname,dirname);
                                        UdmBuild(tmpname,0755);
                                        sprintf(fname,"%s%s",dirname,scrc);
                                        
                                        /*printf("%s new: %08X prev: %08X first: %d i: 
%d\n",fname,new,prev,first,i);*/

                                        done[(prev>>12)&0x000000FF]=1;

                                        /* Read OLD words from tree */
                                        if((oldfd=open(fname,O_RDONLY|UDM_BINARY))>=0){
                                                int j;
                                                
                                                /*printf("Read old: %s\n",fname);*/
                                                read(oldfd,&header,sizeof(header));
                                                
table=(UDM_CACHETABLE*)malloc(header.ntables*sizeof(UDM_CACHETABLE));
                                                
read(oldfd,table,header.ntables*sizeof(UDM_CACHETABLE));
                                                        
                                                for(w=0;w<header.ntables;w++){
                                                        int c=0;
                                                        int 
num=table[w].len/sizeof(UDM_CACHEWORD);
                                                        while((r=(num-c))>0){
                                                                int add=0;
                                                                if(r>4096)r=4096;
                                                                
read(oldfd,cache,r*sizeof(UDM_CACHEWORD));
                                                                
if(!count)logwords=(UDM_LOGWORD*)malloc((count+r)*sizeof(UDM_LOGWORD));
                                                                else    
logwords=(UDM_LOGWORD*)realloc(logwords,(count+r)*sizeof(UDM_LOGWORD));
                                                                for(j=0;j<r;j++){
                                                                        /* Add only 
those OLD words which */
                                                                        /* do not 
present in DEL log      */
                                                                        
if(!PresentInDelLog(del_buf,del_count,cache[j].url_id)){
                                                                                
logwords[count+j].wrd_id=table[w].wrd_id;
                                                                                
logwords[count+j].weight=table[w].weight;
                                                                                
logwords[count+j].url_id=cache[j].url_id;
                                                                                #ifdef 
UDM_STORE_CACHE_WRDPOS
                                                                                
logwords[count+j].wrd_pos=cache[j].wrd_pos;
                                                                                #endif
                                                                                #ifdef 
UDM_STORE_CACHE_TAG
                                                                                
logwords[count+j].tag=cache[j].tag;
                                                                                #endif
                                                                                #ifdef 
UDM_STORE_CACHE_SITEID
                                                                                
logwords[count+j].site_id=cache[j].site_id;
                                                                                #endif
                                                                                #ifdef 
UDM_STORE_CACHE_CATEGORY
                                                                                
logwords[count+j].category=cache[j].category;
                                                                                #endif
                                                                                add++;
                                                                        }else{
                                                                                
/*printf("URL %d were not added\n",cache[j].url_id);*/
                                                                        }
                                                                }
                                                                c+=r;
                                                                count+=add;
                                                                old_words+=add;
                                                        }
                                                }
                                                UDM_FREE(table);
                                                close(oldfd);
                                        }
                                        r=i-first;
                                        
                                        /* Merge OLD and NEW words */
                                        
if(!count)logwords=(UDM_LOGWORD*)malloc((count+r+1)*sizeof(UDM_LOGWORD));
                                        else    
logwords=(UDM_LOGWORD*)realloc(logwords,(count+r+1)*sizeof(UDM_LOGWORD));
                                        
memcpy(logwords+count,buf+first,r*sizeof(UDM_LOGWORD));
                                        count+=r;
                                        
                                        printf("%s old:%4d new:%4d 
total:%4d\n",fname,old_words,r,count);
                                        
                                        /* Sort them in (wrd_id,url_id) order */
                                        
qsort(logwords,(size_t)count,sizeof(UDM_LOGWORD),cmpcache);
                                        
                                        
                                        /* Create table */
                                        header.version=0;
                                        header.ntables=0;
                                        pos=0;
                                        
                                        /* Add break at the end */
                                        logwords[count].wrd_id=0;
                                        logwords[count].weight=0;
                                        
                                        table_size=4096;
                                        
table=(UDM_CACHETABLE*)malloc(table_size*sizeof(UDM_CACHETABLE));
                                        
                                        for(t=1;t<count+1;t++){
                                                
if((logwords[t-1].wrd_id!=logwords[t].wrd_id)||
                                                   
(logwords[t-1].weight!=logwords[t].weight)){
                                                        
table[header.ntables].wrd_id=logwords[t-1].wrd_id;
                                                        
table[header.ntables].weight=logwords[t-1].weight;
                                                        table[header.ntables].pos=pos;
                                                        
table[header.ntables].len=t*sizeof(UDM_CACHEWORD)-pos;
                                                        pos+=table[header.ntables].len;
                                                        header.ntables++;
                                                        if(header.ntables>=table_size){
                                                                table_size+=4096;
                                                                
table=(UDM_CACHETABLE*)realloc(table,table_size*sizeof(UDM_CACHETABLE));
                                                        }
                                                }
                                        }
                                        
                                        /* Write filtered and merged       */
                                        /* OLD and NEW words into tmp file */
                                        
sprintf(tmpname,"%s%c%c.tmp",dirname,scrc[3],scrc[4]);
                                        
newfd=open(tmpname,O_WRONLY|O_CREAT|O_TRUNC|UDM_BINARY,UDM_IWRITE);
                                        write(newfd,&header,sizeof(header));
                                        
write(newfd,table,header.ntables*sizeof(UDM_CACHETABLE));
                                        UDM_FREE(table);
                                        
                                        w=0;
                                        while(w<count){
                                                int j;
                                                if((count-w)>4096)r=4096;
                                                else    r=(count-w);
                                                
                                                for(j=0;j<r;j++){
                                                        
cache[j].url_id=logwords[w+j].url_id;
                                                        #ifdef UDM_STORE_CACHE_WRDPOS
                                                        
cache[j].wrd_pos=logwords[w+j].wrd_pos;
                                                        #endif
                                                        #ifdef UDM_STORE_CACHE_CATEGORY
                                                        
cache[j].category=logwords[w+j].category;
                                                        #endif
                                                        #ifdef UDM_STORE_CACHE_SITEID
                                                        
cache[j].site_id=logwords[w+j].site_id;
                                                        #endif
                                                        #ifdef UDM_STORE_CACHE_TAG
                                                        cache[j].tag=logwords[w+j].tag;
                                                        #endif
                                                }
                                                w+=r;
                                                
write(newfd,cache,r*sizeof(UDM_CACHEWORD));
                                        }
                                        close(newfd);
                                        UDM_FREE(logwords);
                                        
                                        /* rename tmp file into */
                                        /* real name            */
                                        rename(tmpname,fname);
                                }
                                first=i;
                        }
                        prev=new;
                }
        }
        close(fd);
        UDM_FREE(buf);
        
        /* Remove words from */
        /* undone files      */
        for(i=0;i<256;i++){
                if(done[i]==0){
                        char    n[20];
                        sprintf(n,"%03X",log);
                        
sprintf(fname,"%s%c%c%c%c%c%s%02X000",treedir,n[0],n[1],UDMSLASH,n[2],UDMSLASH,n,i);
                        UdmDeleteFromCache(fname,del_buf,del_count);
                }
        }
        UDM_FREE(del_buf);
        return IND_OK;
}



/****************** Search stuff ************************************/

typedef struct m {
        int last;
        int cur;
} UDM_MERG;

typedef struct pm {
        UDM_SEARCHWORD * plast;
        UDM_SEARCHWORD * pcur;
} UDM_PMERG;

#define MAXMERGE 1024

UDM_SEARCHWORD * UdmFindCache(UDM_AGENT * Indexer,char *q){
        int i;
        int nmerg=0;
        int min_num=0;
        int nwrd=0;
        int nwrd1=0;
        int catbase,catmask,tagbase,tagmask;
        UDM_SEARCHWORD * wrd = NULL;
        UDM_SEARCHWORD * wrd1 = NULL;
        UDM_MERG merg[MAXMERGE];
        UDM_PMERG pmerg[MAXMERGE];
#ifdef DEBUG_SEARCH
        unsigned long ticks;
#endif

        
        UdmPrepare(Indexer,q);
        UdmDecodeCatStr(Indexer->Conf->catlimit,&catbase,&catmask);
        UdmDecodeCatStr(Indexer->Conf->taglimit,&tagbase,&tagmask);

#ifdef DEBUG_SEARCH
        fprintf(stderr,"Category: %08X/%08X\n",catbase,catmask);
        fprintf(stderr,"Tag     : %08X/%08X\n",tagbase,tagmask);
        fprintf(stderr,"Site_id : %08X\n",Indexer->Conf->sitelimit);
#endif
        /* url_list=UdmGetURLList(Indexer); */

#ifdef DEBUG_SEARCH
        fprintf(stderr,"Start read cache %d words\n",Indexer->words_in_query);
        ticks=UdmStartTimer();
#endif
        for(i=0;i<Indexer->words_in_query;i++){
                int fd1,t;
                char scrc[16];
                char dname[UDMSTRSIZ];
                char fname[UDMSTRSIZ];
                
                UDM_CACHEHEADER  header;
                UDM_CACHETABLE   table[8192];
                UDM_CACHEWORD  * cw = NULL;
                int data_ofs;
                int group_num=0;
                int group_first=0;
                int group;
                
                sprintf(scrc,"%08X",Indexer->cwords[i]&0xFFFFF000);
                sprintf(dname,"%s%c%s%c%c%c%c%c%c",
                        UDM_VAR_DIR,UDMSLASH,TREEDIR,UDMSLASH,
                        scrc[0],scrc[1],UDMSLASH,scrc[2],UDMSLASH);
#ifdef SHORTNAME
                sprintf(fname,"%s%c%c",dname,scrc[3],scrc[4]);
#else
                sprintf(fname,"%s%s",dname,scrc);
#endif

                if((fd1=open(fname,O_RDONLY|UDM_BINARY))<0){
#ifdef DEBUG_SEARCH
                        fprintf(stderr,"\tCan't open: %s\n",fname);
#endif
                        continue;
                }
#ifdef DEBUG_SEARCH
        fprintf(stderr,"\tOpened: %s\n",fname);
#endif
                read(fd1,&header,sizeof(header));
                data_ofs=sizeof(header)+sizeof(UDM_CACHETABLE)*header.ntables;
                read(fd1,table,header.ntables*sizeof(UDM_CACHETABLE));
                qsort(&table,(size_t)header.ntables,sizeof(UDM_CACHETABLE),cmptable);
                
                group_first=nwrd;
                group=0;
                for(t=0;t<header.ntables;t++){
                        int bytes,fweight=0;
                        
                        /* Build weight of doc section */
                        if(Indexer->weight_factor){
                                int f;
                                for(f=0;f<8;f++)
                                        
fweight+=(((table[t].weight>>f)&0x01)*Indexer->wf[f]);
                        }else{
                                fweight=table[t].weight;
                        }

                        if((Indexer->cwords[i]==table[t].wrd_id)&&(fweight)){
                                int j,num,real_num=0;
                                
                                cw=(UDM_CACHEWORD*)UdmXmalloc((size_t)table[t].len);
                                lseek(fd1,(off_t)data_ofs+table[t].pos,SEEK_SET);
                                bytes=read(fd1,cw,(size_t)table[t].len);

                                if(bytes==-1)fprintf(stderr,"Error reading cache file 
%s\n",fname);

                                num=bytes/sizeof(UDM_CACHEWORD);
                                
if(!nwrd)wrd=(UDM_SEARCHWORD*)UdmXmalloc(num*sizeof(UDM_SEARCHWORD));
                                else    
wrd=(UDM_SEARCHWORD*)UdmXrealloc(wrd,(num+nwrd)*sizeof(UDM_SEARCHWORD));

                                for(j=0;j<num;j++){
                                        register int add=1;
                                        
                                        /* Check category,tag and site_id match */
                                        #ifdef UDM_STORE_CACHE_CATEGORY
                                        add=((cw[j].category&catmask)==catbase);
                                        #endif
                                        #ifdef UDM_STORE_CACHE_TAG
                                        add=add&&((cw[j].tag&tagmask)==tagbase);
                                        #endif
                                        #ifdef UDM_STORE_CACHE_SITEID
                                        
add=add&&(Indexer->Conf->sitelimit?(cw[j].site_id==Indexer->Conf->sitelimit):1);
                                        #endif
                                        
                                        /* Add if passed */
                                        if(add){
                                                wrd[nwrd+real_num].url_id=cw[j].url_id;
                                                
wrd[nwrd+real_num].count=1<<Indexer->wordorders[i];
                                                wrd[nwrd+real_num].weight=fweight;
                                                #ifdef UDM_STORE_CACHE_WRDPOS
                                                wrd[nwrd+real_num].pos=cw[j].wrd_pos;
                                                #endif
                                                real_num++;
                                        }
                                }
                                if(real_num>0){
                                        if(group<2){
                                                merg[nmerg].cur=nwrd;
                                                merg[nmerg].last=nwrd+real_num;
                                                nmerg++;
                                        }
                                        group_num+=real_num;
                                        nwrd+=real_num;
                                        group++;
                                }
                        }
                }
                close(fd1);

                if((group==0)&&(Indexer->search_mode==UDM_MODE_ALL)){
                        Indexer->total_found=0;
                        return(NULL);
                }
                sprintf(UDM_STREND(Indexer->wordinfo)," %s: 
%d",Indexer->words[i],group_num);

                if(nmerg){
                        merg[nmerg-1].last=nwrd;
                        qsort(wrd+merg[nmerg-1].cur,
                                (size_t)merg[nmerg-1].last-merg[nmerg-1].cur,
                                sizeof(UDM_SEARCHWORD),cmpurlid);
                }
        }
        
#ifdef DEBUG_SEARCH
        ticks=UdmStartTimer() - ticks;
        fprintf(stderr,"Stop read cache:\t%.2f\n",(float)ticks/1000);
        fprintf(stderr,"Start merge %d groups, %d words\n",nmerg,nwrd);
        ticks=UdmStartTimer();
        fprintf(stderr,"Allocate buffer for merged words\n");
#endif

        if(nwrd){
                /* Allocate memory for merged words */
                wrd1=(UDM_SEARCHWORD*)UdmXmalloc(nwrd*sizeof(UDM_SEARCHWORD));
                /* Fill pointers array */
                for(i=0;i<nmerg;i++){
                        pmerg[i].pcur=wrd+merg[i].cur;
                        pmerg[i].plast=wrd+merg[i].last;
                }
        }

        while(nmerg&&nwrd){
                register int k, min_val=0x7FFFFFFF;
                register UDM_PMERG * p;

                if(nmerg==1){
                        int num;
                        
                        num=pmerg[0].plast-pmerg[0].pcur;
                        memcpy(&wrd1[nwrd1],pmerg[0].pcur,num*sizeof(UDM_SEARCHWORD));
                        nwrd1+=num;
                        break;
                }else{
                        for(k=0;k<nmerg;k++){
                                if(min_val>(pmerg[k].pcur)->url_id){
                                        min_val=(pmerg[k].pcur)->url_id;
                                        min_num=k;      
                                }
                        }
                        p=&pmerg[min_num];
                        memcpy(&wrd1[nwrd1],p->pcur,sizeof(UDM_SEARCHWORD));
                        (p->pcur)++;
                        if(p->pcur>=p->plast){
                                nmerg--;
                                if(min_num<nmerg){
                                        
memmove(p,p+1,sizeof(UDM_PMERG)*(nmerg-min_num));
                                }
                        }
                        nwrd1++;
                }
        }
        UDM_FREE(wrd);

#ifdef DEBUG_SEARCH
        ticks=UdmStartTimer() - ticks;
        fprintf(stderr,"Stop merge groups:\t%.2f nwrd=%d 
nwrd1=%d\n",(float)ticks/1000,nwrd,nwrd1);
#endif
        
        if((Indexer->total_found=nwrd1)){
                /* Sort in URL order */
                if(Indexer->Conf->use_phrases)
                        UdmSortSearchWordsByURL(wrd1,Indexer->total_found);
                UdmGroupByURL(Indexer,wrd1);
        }
        return(wrd1);
}


/**********************************************/

static int cmplogname(const void * s1,const void * s2){
        int n1=atoi((const char*)s1);
        int n2=atoi((const char*)s2);
        
        if(n1<n2)return(-1);
        else
        if(n1>n2)return(1);
        else    return(0);
}

#define LOGNO(x)        (((unsigned int)(x))>>20)

static int write_cache(UDM_LOGWORD * wrd, int nwrd){
        int i,fd,first=0;
        char name[1024];

        if(!nwrd)return(0);

        /* Sort in word_id order */
        qsort(wrd,(size_t)nwrd,sizeof(UDM_LOGWORD),cmplog);

        /* Barier for last words portion*/
        wrd[nwrd].wrd_id=wrd[nwrd-1].wrd_id+0x00100000;

        for(i=1;i<=nwrd;i++){
                if(LOGNO(wrd[i].wrd_id)!=LOGNO(wrd[i-1].wrd_id)){
                        
sprintf(name,"%s%c%s%c%03X.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH,LOGNO(wrd[first].wrd_id));
                        
if((fd=open(name,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){
                                fprintf(stderr,"open('%s') error: 
%s\n",name,strerror(errno));
                        }else{
                                int nbytes;

                                nbytes=(i-first)*sizeof(UDM_LOGWORD);
                                if(nbytes!=write(fd,wrd+first,(size_t)nbytes)){
                                        fprintf(stderr,"write('%s') error: 
%s\n",name,strerror(errno));
                                }
                                close(fd);
                        }
                        first=i;
                }
        }
        return 0;
}

int UdmPreSplitCacheLog(){
        char name[BUFSIZ];
        DIR * dirp;
        struct dirent * dp;
        char ** logs;
        int nlogs,i,nwrd,fd;
        struct stat sb;
        size_t del_size=0;
        UDM_LOGWORD * wrd;
        int wrd_count;
        UDM_LOGDEL  * del;
        UDM_LOGD_CMD cmd;
        UDM_LOGD_WRD *log_wrd;

        sprintf(name,"%s%c%s",UDM_VAR_DIR,UDMSLASH,LOGDIR);
        printf("Open dir '%s'\n",name);

        nlogs=0;
        logs=(char**)malloc(sizeof(char*));

        dirp = opendir(name);
        while ((dp = readdir(dirp)) != NULL){
                if((strlen(dp->d_name)>4)&&(!strcmp(UDM_STREND(dp->d_name)-4,".wrd"))){
                        nlogs++;
                        logs=(char**)realloc(logs,nlogs*sizeof(char*));                
     
                        logs[nlogs-1]=strdup(dp->d_name);
                        logs[nlogs-1][strlen(logs[nlogs-1])-4]='\0';
                }
        }
        closedir(dirp);
        
        qsort(logs,(size_t)nlogs,sizeof(char*),&cmplogname);

        for(i=0;i<nlogs;i++){
                
sprintf(name,"%s%c%s%c%s.wrd",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]);
                printf("Preparing word log %s\n",logs[i]);
                
#define NWRD    3000000
                /* nwrd+1 for barier */
                if((wrd=(UDM_LOGWORD*)malloc((NWRD+1)*sizeof(UDM_LOGWORD)))){
                        wrd_count=0;
                        if((fd=open(name,O_RDONLY|UDM_BINARY))<0){
                                fprintf(stderr,"open('%s') error: 
%s\n",name,strerror(errno));
                        }else{
                                int nbytes;
/* Old format
                                while((nbytes=read(fd,wrd,NWRD*sizeof(UDM_LOGWORD)))){
                                        nwrd=nbytes/sizeof(UDM_LOGWORD);
                                        printf("Read %d bytes %d words\n",nbytes,nwrd);
                                        write_cache(wrd,nwrd);
                                }
*/                              
                                /* Read next document header */
                                while((nbytes=read(fd,&cmd,sizeof(UDM_LOGD_CMD)))){
                                        nwrd=cmd.nwords;
                                        /* printf("Read %d bytes %d 
words\n",nbytes,nwrd); */
                                        /* if free space is less then document size */
                                        if (NWRD-wrd_count<nwrd){
                                                /* write logs */
                                                write_cache(wrd,wrd_count);
                                                wrd_count=0;
                                        }
                                        
                                        /* Read words */
                                        nbytes=nwrd*sizeof(UDM_LOGD_WRD);
                                        log_wrd=(UDM_LOGD_WRD*)malloc((size_t)nbytes);
                                        
                                        if((nbytes!=read(fd,log_wrd,(size_t)nbytes))){
                                                fprintf(stderr,"Read word log error: 
%s\n",strerror(errno));
                                        }else{
                                                int j;
                                                /* Store words in wrd buffer */
                                                for(j=0;j<nwrd;j++){
                                                        
wrd[wrd_count+j].stamp=cmd.stamp;
                                                        
wrd[wrd_count+j].url_id=cmd.url_id;
                                                        #ifdef UDM_STORE_CACHE_SITEID
                                                        
wrd[wrd_count+j].site_id=cmd.site_id;
                                                        #endif
                                                        #ifdef UDM_STORE_CACHE_TAG
                                                        wrd[wrd_count+j].tag=cmd.tag;
                                                        #endif
                                                        #ifdef UDM_STORE_CACHE_CATEGORY
                                                        
wrd[wrd_count+j].category=cmd.category;
                                                        #endif
                                                        
wrd[wrd_count+j].wrd_id=log_wrd[j].crc;
                                                        
wrd[wrd_count+j].weight=log_wrd[j].weight;
                                                        #ifdef UDM_STORE_CACHE_WRDPOS
                                                        
wrd[wrd_count+j].wrd_pos=log_wrd[j].wrd_pos;
                                                        #endif
                                                        
                                                }
                                                wrd_count+=nwrd;
                                        }
                                        UDM_FREE(log_wrd);
                                }
                                if (wrd_count>0){
                                        write_cache(wrd,wrd_count);
                                        wrd_count=0;
                                }

                                close(fd);
                        }
                        UDM_FREE(wrd);
                }else{
                        fprintf(stderr,"Malloc error: %s\n",strerror(errno));
                }
        }

        /* FIXME: add checking */
        del=(UDM_LOGDEL*)malloc(sizeof(UDM_LOGDEL));
        for(i=0;i<nlogs;i++){
                
sprintf(name,"%s%c%s%c%s.del",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]);
                printf("Preparing del log %s\n",logs[i]);
                
                if(!stat(name,&sb)){
                        
                        del_size+=sb.st_size;
                        del=(UDM_LOGDEL*)realloc(del,del_size);
                        if((fd=open(name,O_RDONLY|UDM_BINARY))<0){
                                fprintf(stderr,"open('%s') error: 
%s\n",name,strerror(errno));
                        }else{
                                
if(sb.st_size!=read(fd,del+(del_size-sb.st_size)/sizeof(UDM_LOGDEL),(size_t)sb.st_size)){
                                        fprintf(stderr,"read('%s') error: 
%s\n",name,strerror(errno));
                                }
                                close(fd);
                        }
                }else{
                        fprintf(stderr,"stat('%s') error: %s\n",name,strerror(errno));
                }
        }
        sprintf(name,"%s%c%s%cdel.log",UDM_VAR_DIR,UDMSLASH,SPLDIR,UDMSLASH);

        if((fd=open(name,O_WRONLY|O_CREAT|O_APPEND|UDM_BINARY,UDM_IWRITE))<0){
                fprintf(stderr,"open('%s') error: %s\n",name,strerror(errno));
        }else{
                if(del_size!=write(fd,del,del_size)){
                        fprintf(stderr,"write('%s') error: %s\n",name,strerror(errno));
                }
                close(fd);
        }
        UDM_FREE(del);
/*************************************************************
        printf("Deleting raw logs\n");
        for(i=0;i<nlogs;i++){
                
sprintf(name,"%s%c%s%c%s.wrd",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]);
                unlink(name);
                
sprintf(name,"%s%c%s%c%s.del",UDM_VAR_DIR,UDMSLASH,LOGDIR,UDMSLASH,logs[i]);
                unlink(name);
        }               
*************************************************************/
        UDM_FREE(logs);
        
        return 0;
}

Reply via email to