Dear Lucene dev
We are from the the Hermes team. Hermes is a project base on lucene 3.5 and
solr 3.5.
Hermes process 100 billions documents per day,2000 billions document for total
days (two month). Nowadays our single cluster index size is over then
200Tb,total size is 600T. We use lucene for the big data warehouse speed up
.reduce the analysis response time, for example filter like this age=32 and
keywords like 'lucene' or do some thing like count ,sum,order by group by and
so on.
Hermes could filter a data form 1000billions in 1 secondes.10billions
data`s order by taken 10s,10billions data`s group by thaken 15 s,10 billions
days`s sum,avg,max,min stat taken 30 s
For those purpose,We made lots of improve base on lucene and solr , nowadays
lucene has change so much since version 4.10, the coding has change so much.so
we don`t want to commit our code to lucene .only to introduce our imporve base
on luene 3.5,and introduce how hermes can process 100billions documents per day
on 32 Physical Machines.we think it may be helpfull for some people who have
the similary sense .
First level index(tii),Loading by Demand
Original:
1. .tii file is load to ram by TermInfosReaderIndex
2. that may quite slowly by first open Index
3. the index need open by Persistence,once open it ,nevel close it.
4. this cause will limit the number of the index.when we have thouthand of
index,that will Impossible.
Our improve:
1. Loading by Demand,not all fields need to load into memory
2. we modify the method getIndexOffset(dichotomy) on disk, not on memory,but we
use lru cache to speed up it.
3. getIndexOffset on disk can save lots of memory,and can reduce times when
open a index
4. hermes often open different index for dirrerent Business; when the index is
not often to used ,we will to close it.(manage by lru)
5. such this my 1 Physical Machine can store over then 100000 number of index.
Solve the problem:
1. hermes need to store over then 1000billons documents,we have not enough
memory to store the tii file
2. we have over then 100000 number of index,if all is opend ,that will weast
lots of file descriptor,the file system will not allow.
Build index on Hdfs
1. We modifyed lucene 3.5 code at 2013.so that we can build index direct on
hdfs.(lucene has support hdfs since 4.0)
2. All the offline data is build by mapreduce on hdfs.
3. we move all the realtime index from local disk to hdfs
4. we can ignore disk failure because of index on hdfs
5. we can move process from on machine to another machine on hdfs
6. we can quick recover index when a disk failure happend .
7. we does need recover data when a machine is broker(the Index is so big move
need lots of hours),the process can quick move to other machine by zookeeper
heartbeat.
8. all we know index on hdfs is slower then local file system,but why ? local
file system the OS make so many optimization, use lots cache to speed up random
access. so we also need a optimization on hdfs.that is why some body often said
that hdfs index is so slow the reason is that you didn`t optimize it .
9. we split the hdfs file into fix length block,1kb per block.and then use a
lru cache to cache it ,the tii file and some frequent terms will speed up.
10. some hdfs file does`t need to close Immediately we make a lru cache to
cache it ,to reduce the frequent of open file.
Improve solr, so that one core can dynamic process multy index.
Original:
1. a solr core(one process) only process 1~N index by solr config
Our improve:
2. use a partion like oracle or hadoop hive.not build only one big
index,instand build lots of index by day(month,year,or other partion)
3. dynamic create table for dynamic businiss
Solve the problem:
1. to solve the index is to big over then Interger.maxvalue, docid overflow
2. some times the searcher not need to search all of the data ,may be only need
recent 3 days.
Label mark technology for doc values
Original:
1. group by,sort,sum,max,min ,avg those stats method need to read Original from
tis file
2. FieldCacheImpl load all the term values into memory for solr
fieldValueCache,Even if i only stat one record .
3. first time search is quite slowly because of to build the fieldValueCache
and load all the term values into memory
Our improve:
1. General situation,the data has a lot of repeat value,for exampe the sex file
,the age field .
2. if we store the original value ,that will weast a lot of storage.
so we make a small modify at TermInfosWriter, Additional add a new filed called
termNumber.
make a unique term sort by term through TermInfosWriter, and then gave each
term a unique Number from begin to end (mutch like solr UnInvertedField).
3. we use termNum(we called label) instead of Term.we store termNum(label) into
a file called doctotm. the doctotm file is order by docid,lable is store by
fixed length. the file could be read by random read(like fdx it store by fixed
length),the file doesn`t need load all into memory.
4. the label`s order is the same with terms order .so if we do some calculation
like order by or group by only read the label. we don`t need to read the
original value.
5. some field like sex field ,only have 2 different values.so we only use 2
bits(not 2 bytes) to store the label, it will save a lot of Disk io.
6. when we finish all of the calculation, we translate label to Term by a
dictionary.
7. if a lots of rows have the same original value ,the original value we only
store once,onley read once.
Solve the problem:
1. Hermes`s data is quite big we don`t have enough memory to load all Values to
memory like lucene FieldCacheImpl or solr UnInvertedField.
2. on realtime mode ,data is change Frequent , The cache is invalidated
Frequent by append or update. build FieldCacheImpl will take a lot of times and
io;
3. the Original value is lucene Term. it is a string type. whene sortring or
grouping ,thed string value need a lot of memory and need lot of cpu time to
calculate hashcode \compare \equals ,But label is number is fast.
4. the label is number ,it`s type mabbe short ,or maybe byte ,or may be integer
whitch depending on the max number of the label.
5. read the original value will need lot of io, need iterate tis file.even
though we just need to read only docunent.
6. Solve take a lot of time when first build FieldCacheImpl.
two-phase search
Original:
1. group by order by use original value,the real value may be is a string
type,may be more larger ,the real value maybe need a lot of io because of to
read tis,frq file
2. compare by string is slowly then compare by integer
Our improve:
1. we split one search into multy-phase search
2. the first search we only search the field that use for order by ,group by
3. the first search we doesn`t need to read the original value(the real
value),we only need to read the docid and label(see < Label mark technology for
doc values>) for order by group by.
4. when we finish all the order by and group by ,may be we only need to return
Top n records .so we start next to search to get the Top n records original
value.
Solve the problem:
1. reduce io ,read original take a lot of disk io
2. reduce network io (for merger)
3. most of the field has repeated value, the repeated only need to read once
the group by filed only need to read the origina once by label whene display to
user.
4. most of the search only need to display on Top n (n<=100) results, so use to
phrase search some original value could be skip.
multy-phase indexing
1. hermes doesn`t update index one by one,it use batch index
2. the index area is split into four area ,they are called doclist=>buffer
index=>ram index=>diskIndex/hdfsIndex
3. doclist only store the solrinputdocument for the batch update or append
4. buffer index is a ramdirectory ,use for merge doclist to index.
5. ram index is also a ramdirector ,but it is biger then buffer index, it can
be search by the user.
6. disk/hdfs index is Persistence store use for big index
7. we also use wal called binlog(like mysql binlog) for recover
[cid:_Foxmail.0@804840F2-FE63-4FD9-B75D-4DA504C5B591]
two-phase commit for update
1. we doesn`t update record once by once like solr(solr is search by term,found
the document,delete it,and then append a new one),one by one is slowly.
2. we need Atomic inc field ,solr that can`t support ,solr only support replace
field value.
Atomic inc field need to read the last value first ,and then increace it`s
value.
3. hermes use pre mark delete,batch commit to update a document.
4. if a document is state is premark ,it also could be search by the user,unil
we commit it.
we modify SegmentReader ,split deletedDocs into to 3 part. one part is called
deletedDocstmp whitch is for pre mark (pending delete),another one is called
deletedDocs_forsearch which is for index search, another is also call
deletedDocs
5. once we want to pending delete a document,we operate deletedDocstmp (a
openbitset)to mark one document is pending delete.
and then we append our new value to doclist area(buffer area)
the pending delete means user also could search the old value.
the buffer area means user couldn`t search the new value.
but when we commit it(batch)
the old value is realy droped,and flush all the buffer area to Ram area(ram
area can be search)
6. the pending delete we called visual delete,after commit it we called physics
delete
7. hermes ofthen visula delete a lots of document ,and then commit once ,to
improve up the Performance one by one
8. also we use a lot of cache to speed up the atomic inc field.
Term data skew
Original:
1. lucene use inverted index to store term and doclist.
2. some filed like sex has only to value male or female, so male while have
50% of doclist.
3. solr use filter cache to cache the FQ,FQ is a openbitset which store the
doclist.
4. when the firest time to use FQ(not cached),it will read a lot of doclist to
build openbitset ,take a lot of disk io.
5. most of the time we only need the TOP n doclist,we dosn`t care about the
score sort.
Our improve:
1. we often combination other fq,to use the skip doclist to skip the docid that
not used( we may to seed the query methord called advance)
2. we does`n cache the openbitset by FQ ,we cache the frq files block into
memeory, to speed up the place often read.
3. our index is quite big ,if we cache the FQ(openbitset),that will take a lots
of memory
4. we modify the indexSearch to support real Top N search and ignore the doc
score sort
Solve the problem:
1. data skew take a lot of disk io to read not necessary doclist.
2. 2000billions index is to big,the FQ cache (filter cache) user openbitset
take a lot of memor
3. most of the search ,only need the top N result ,doesn`t need score sort,we
need to speed up the search time
Block-Buffer-Cache
Openbitset,fieldvalueCache need to malloc a big long[] or int[] array. it is
ofen seen by lots of cache ,such as
UnInvertedField,fieldCacheImpl,filterQueryCache and so on. most of time much
of the elements is zero(empty),
Original:
1. we create the big array directly,when we doesn`t neet we drop it to JVM GC
Our improve:
1. we split the big arry into fix length block,witch block is a small array,but
fix 1024 length .
2. if a block `s element is almost empty(element is zero),we use hashmap to
instead of array
3. if a block `s non zero value is empty(length=0),we couldn`t create this
block arrry only use a null to instead of array
4. when the block is not to use ,we collectoion the array to buffer ,next time
we reuse it
Solve the problem:
1. save memory
2. reduce the jvm Garbage collection take a lot of cpu resource.
weakhashmap,hashmap , synchronized problem
1. FieldCacheImpl use weakhashmap to manage field value cache,it has memory
leak BUG.
2. sorlInputDocunent use a lot of hashmap,linkhashmap for field,that weast a
lot of memory
3. AttributeSource use weakhashmap to cache class impl,and use a global
synchronized reduce performance
4. AttributeSource is a base class , NumbericField extends AttributeSource,but
they create a lot of hashmap,but NumbericField never use it .
5. all of this ,JVM GC take a lot of burder for the never used hashmap.
Our improve:
1. weakhashmap is not high performance ,we use softReferance instead of it
2. reuse NumbericField avoid create AttributeSource frequent
3. not use global synchronized
when we finish this optimization our process,speed up from 20000/s to 60000/s
(1k per document).
Other GC optimization
1. reuse byte[] arry in the inputbuffer ,outpuer buffer .
2. reuse byte[] arry in the RAMfile
3. remove some finallze method, the not necessary.
4. use StringHelper.intern to reuse the field name in solrinputdocument
Directory optimization
1. index commit doesn`t neet sync all the field
2. we use a block cache on top of FsDriectory and hdfsDirectory to speed up
read sppedn
3. we close index or index file that not often to used.also we limit the index
that allow max open;block cache is manager by LRU
network optimization
1. optimization ThreadPool in searchHandle class ,some times does`t need keep
alive connection,and increate the timeout time for large Index.
2. remove jetty ,we write socket by myself ,jetty import data is not high
performance
3. we change the data import form push mode to pull mode with like apache storm.
append mode,optimization
1. append mode we doesn`t store the field value to fdt file.that will take a
lot of io on index merger, but it is doesn`t need.
2. we store the field data to a single file ,the files format is hadoop
sequence file ,we use LZO compress to save io
3. we make a pointer to point docid to sequencefile
non tokenizer field optimization
1. non tokenizer field we doesn`t store the field value to fdt field.
2. we read the field value from label (see <<Label mark technology for doc
values>>)
3. most of the field has duplicate value,this can reduce the index file size
multi level of merger server
1. solr can only use on shard to act as a merger server .
2. we use multi level of merger server to merge all shards result
3. shard on the same mathine have the high priority to merger by the same
mathine merger server.
solr`s merger is like this
[cid:_Foxmail.1@621DB6EB-924D-4485-911D-18CD154885DC]
hermes`s merger is like this
[cid:_Foxmail.2@81E0418D-FEFD-49D5-AC9B-1E4044F74A7F]
other optimize
1. hermes support Sql .
2. support union Sql from different tables;
3. support view table
finallze
Hermes`sql may be like this
l select
higo_uuid,thedate,ddwuid,dwinserttime,ddwlocaltime,dwappid,dwinituserdef1,dwclientip,sclientipv6,dwserviceip,dwlocaiip,dwclientversion,dwcmd,dwsubcmd,dwerrid,dwuserdef1,dwuserdef2,dwuserdef3,dwuserdef4,cloglevel,szlogstr
from sngsearch06,sngsearch09,sngsearch12 where thedate in ('20140917') and
ddwuin=5713 limit 0,20
l select thedate,ddwuin,dwinserttime,ddwlocaltime from sngsearch12 where
thedate in ('20140921') and ddwuin=5713 order by ddwlocaltime desc limit 0,10
l select count(*),count(ddwuid) from sngsearch03 where thedate=20140921 limit
0,100
l select sum(acnt),average(acnt),max(acnt),min(acnt) from sngsearch03 where
thedate=20140921 limit 0,100
l select thedate,ddwuid,sum(acnt),count(*) from sngsearch18 where thedate in
(20140908) and ddwuid=7823 group by thedate,ddwuid limit 0,100;
l select count(*) from guangdiantong where thedate ='20141010' limit 0,100
l select freqtype,fspenttime,fmodname,yyyymmddhhmmss,hermestime,freqid from
guangdiantong where thedate ='20141010' limit 0,100
l select freqtype,fspenttime,fmodname,yyyymmddhhmmss,hermestime,freqid from
guangdiantong where thedate ='20141010' order by yyyymmddhhmmss desc limit 0,10
l
l select miniute1,count(*) from guangdiantong where thedate ='20141010' group
by miniute1 limit 0,100
l select miniute5,count(*) from guangdiantong where thedate ='20141010' group
by miniute5 limit 0,100
l select hour,miniute15,count(*) from guangdiantong where thedate ='20141010'
group by hour,miniute15 order by miniute15 desc limit 0,100
l select hour,count(*),sum(fspenttime),average(fspenttime),average(ferrorcode)
from guangdiantong where thedate ='20141010' and freqtype=1 group by hour
limit 0,100
l select freqtype,count(*),sum(fspenttime),average(fspenttime) from
guangdiantong where thedate ='20141010' and (freqtype>=10000 and
freqtype<=10100) group by freqtype limit 0,100
l select freqtype,count(*),sum(fspenttime),average(fspenttime) from
guangdiantong where thedate ='20141010' and (freqtype>=10000 and
freqtype<=10100) group by freqtype order by average(fspenttime) desc limit 0,100
l
l select hour,miniute15,count(*),sum(fspenttime),average(fspenttime) from
guangdiantong where thedate ='20141010' group by hour,miniute15 order by
miniute15 desc limit 0,100
l
l select
thedate,yyyymmddhhmmss,miniute1,miniute5,miniute15,hour,hermestime,freqtype,freqname,freqid,fuid,fappid,fmodname,factionname,ferrorcode,ferrormsg,foperateret,ferrortype,fcreatetime,fspenttime,fserverip,fversion
from guangdiantong where thedate ='20141010' order by yyyymmddhhmmss desc
limit 0,100
________________________________
yannianmu(母延年)