The "deleted" files are only freed by OS kernel if no longer an IndexReader accesses them. Did you get a new realtime reader after merging and *closed* the old one?
----- Uwe Schindler H.-H.-Meier-Allee 63, D-28213 Bremen http://www.thetaphi.de eMail: u...@thetaphi.de > -----Original Message----- > From: Jamie [mailto:ja...@stimulussoft.com] > Sent: Wednesday, September 29, 2010 10:48 AM > To: java-user@lucene.apache.org > Subject: File Handle Leaks During Lucene 3.0.2 Merge > > Hi There > > I am noticing file handle leaks appearing on Index files. I think the leaks occur > during the Lucene merge operation. > Lsof reports the following: > > java 28604 root 213r REG 8,33 1098681 > 57409621 /var/index/vol201009/_a4w.cfs (deleted) > java 28604 root 214r REG 8,33 35164 > 57409699 /var/index/vol201009/_a4x.cfs (deleted) > java 28604 root 215r REG 8,33 46139 > 57409691 /var/index/vol201009/_a4y.cfs (deleted) > java 28604 root 216r REG 8,33 40342 > 57409673 /var/index/vol201009/_a4z.cfs (deleted) > java 28604 root 217r REG 8,33 44204 > 57409675 /var/index/vol201009/_a50.cfs (deleted) > > We are using Lucene's realtime search feature so have handles open on the > index. Could it be that we are not handling the merge situation correctly or > something? Your ideas are most appreciated. > > The source code to our index file as follows: > > package com.stimulus.archiva.index; > import com.stimulus.util.*; > > import java.io.File; > import java.io.IOException; > import java.io.PrintStream; > import org.apache.commons.logging.*; > import org.apache.lucene.document.Document; > import org.apache.lucene.index.*; > import org.apache.lucene.store.FSDirectory; > import com.stimulus.archiva.domain.Config; > import com.stimulus.archiva.exception.*; import > com.stimulus.archiva.language.AnalyzerFactory; > import com.stimulus.archiva.search.*; > import java.util.*; > import org.apache.lucene.store.LockObtainFailedException; > import org.apache.lucene.store.AlreadyClosedException; > import java.util.concurrent.locks.ReentrantLock; > import java.util.concurrent.*; > > public class LuceneIndex extends Thread { > > protected ArrayBlockingQueue<LuceneDocument> queue; > protected static final Log logger = > LogFactory.getLog(LuceneIndex.class.getName()); > protected static final Log indexLog = LogFactory.getLog("indexlog"); > IndexWriter writer = null; > protected static ScheduledExecutorService scheduler; > protected static ScheduledFuture<?> scheduledTask; > protected LuceneDocument EXIT_REQ = null; > ReentrantLock indexLock = new ReentrantLock(); > ArchivaAnalyzer analyzer = new ArchivaAnalyzer(); > File indexLogFile; > PrintStream indexLogOut; > IndexProcessor indexProcessor; > String friendlyName; > String indexPath; > int maxSimultaneousDocs; > int indexThreads; > IndexReader reader = null; > volatile boolean reopen = false; > FSDirectory fsDirectory; > ReentrantLock readerLock = new ReentrantLock(); > enum Status { READY, SHUTDOWN }; > Status status = Status.SHUTDOWN; > > public LuceneIndex(int queueSize, LuceneDocument exitReq, String > friendlyName, String indexPath, int maxSimultaneousDocs, int > indexThreads) { > this.queue = new > ArrayBlockingQueue<LuceneDocument>(queueSize); > this.EXIT_REQ = exitReq; > this.friendlyName = friendlyName; > this.indexPath = indexPath; > this.maxSimultaneousDocs = maxSimultaneousDocs; > this.indexThreads = indexThreads; > this.status = Status.SHUTDOWN; > // if (indexLog.isDebugEnabled()) { > //setLog(friendlyName); > //} > } > > > public int getMaxSimultaneousDocs() { > return maxSimultaneousDocs; > } > > public void setMaxSimultaneousDocs(int maxSimultaneousDocs) { > this.maxSimultaneousDocs = maxSimultaneousDocs; > } > > > public ReentrantLock getIndexLock() { > return indexLock; > } > > protected void setLog(String logName) { > > try { > indexLogFile = getIndexLogFile(logName); > if (indexLogFile!=null) { > if (indexLogFile.length()>10485760) > indexLogFile.delete(); > indexLogOut = new PrintStream(indexLogFile); > } > logger.debug("set index log file path > {path='"+indexLogFile.getCanonicalPath()+"'}"); > } catch (Exception e) { > logger.error("failed to open index log file:"+e.getMessage(),e); > } > > } > > protected File getIndexLogFile(String logName) { > try { > String logfilepath = > Config.getFileSystem().getLogPath()+File.separator+"indexdebug_"+logName+". > log"; > return new File(logfilepath); > } catch (Exception e) { > logger.error("failed to open index log file:"+e.getMessage(),e); > return null; > } > } > > > > protected void openIndex() throws MessageSearchException { > Exception lastError = null; > > if (writer==null) { > logger.debug("openIndex() index "+friendlyName+" will be opened. it > is currently closed."); > } else { > logger.debug("openIndex() did not bother opening index > "+friendlyName+". it is already open."); > return; > } > logger.debug("opening index "+friendlyName+" for write"); > logger.debug("opening search index "+friendlyName+" for write > {indexpath='"+indexPath+"'}"); > boolean writelock; > int attempt = 0; > int maxattempt = 10; > > if > (Config.getConfig().getIndex().getMultipleIndexProcesses()) { > maxattempt = 10000; > } else { > maxattempt = 10; > } > > do { > writelock = false; > try { > fsDirectory = FSDirectory.open(new File(indexPath)); > int maxIndexChars = > Config.getConfig().getIndex().getMaxIndexPerFieldChars(); > writer = new > IndexWriter(fsDirectory,analyzer,new > IndexWriter.MaxFieldLength(maxIndexChars)); > if (indexLog.isDebugEnabled() && > indexLogOut!=null) { > writer.setInfoStream(indexLogOut); > } > } catch (LockObtainFailedException lobfe) { > logger.debug("write lock on index "+friendlyName+". will > reopen in 50ms."); > try { Thread.sleep(50); } catch (Exception e) {} > attempt++; > writelock = true; > } catch (CorruptIndexException cie) { > throw new MessageSearchException("index "+friendlyName+" > appears to be corrupt. please reindex the active > volume."+cie.getMessage(),logger); > } catch (Throwable io) { > throw new MessageSearchException("failed to write document to > index "+friendlyName+":"+io.getMessage(),logger); > } > } while (writelock && attempt<maxattempt); > if (attempt>=10000) > throw new MessageSearchException("failed to open index > "+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger); > } > > public void indexDocument(LuceneDocument luceneDocument) throws > MessageSearchException { > logger.debug("index document {"+luceneDocument+"}"); > if (status==Status.SHUTDOWN) { > throw new MessageSearchException("index is shutdown.",logger); > } > long s = (new Date()).getTime(); > if (luceneDocument == null) > throw new MessageSearchException("assertion failure: > null document",logger); > try { > queue.put(luceneDocument); > } catch (InterruptedException ie) { > throw new MessageSearchException("failed to add document to > queue:"+ie.getMessage(),ie,logger); > } > logger.debug("document indexed successfully {"+luceneDocument+"}"); > > logger.debug("indexing message end {"+luceneDocument+"}"); > long e = (new Date()).getTime(); > logger.debug("indexing time {time='"+(e-s)+"'}"); > } > > public class DocWriter implements Runnable { > > LuceneDocument doc; > String language; > LinkedList<LuceneDocument> pushbacks; > ReentrantLock pushbackLock; > > public DocWriter(LuceneDocument doc,String > language,LinkedList<LuceneDocument> pushbacks, ReentrantLock > pushbackLock) { > this.doc = doc; > this.language = language; > this.pushbacks = pushbacks; > } > > public void run() { > try { > > writer.addDocument(doc.getDocument(),AnalyzerFactory.getAnalyzer(language > ,AnalyzerFactory.Operation.INDEX)); > } catch (IOException io) { > logger.error("failed to add document to > index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > try { > pushbackLock.lock(); > pushbacks.add(doc); > } finally { > pushbackLock.unlock(); > } > } > } > > } > > > > public class IndexProcessor extends Thread { > > public IndexProcessor() { > setName("index processor"); > } > > public void run() { > boolean exit = false; > LuceneDocument luceneDocument = null; > LinkedList<LuceneDocument> pushbacks = new > LinkedList<LuceneDocument>(); > ReentrantLock pushbackLock = new ReentrantLock(); > > > while (!exit) { > > > //documentPool = > Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThr > eads()); > luceneDocument = null; > try { > luceneDocument = (LuceneDocument) queue.take(); > } catch (InterruptedException e) { > logger.debug("index exit req received. exiting"); > exit = true; > continue; > } > if (luceneDocument==EXIT_REQ) { > logger.debug("index exit req received. exiting"); > exit = true; > continue; > } > try { > > indexLock.lock(); > > if (luceneDocument==null) { > logger.debug("index info is null"); > } > int i = 0; > ExecutorService threadPool = > Executors.newFixedThreadPool(indexThreads,ThreadUtil.getFlexibleThreadFact > ory("indexwritepool",Thread.NORM_PRIORITY,true)); > > while(luceneDocument!=null && > i<maxSimultaneousDocs) { > Document doc = > luceneDocument.getDocument(); > String language = doc.get("lang"); > if (language==null) { > language = > Config.getConfig().getIndex().getIndexLanguage(); > } > DocWriter docWriter = new > DocWriter(luceneDocument,language,pushbacks,pushbackLock); > threadPool.submit(docWriter); > > i++; > if (i<maxSimultaneousDocs) { > luceneDocument = (LuceneDocument) > queue.poll(); > > if (luceneDocument==null) { > logger.debug("index info is > null"); > } > > if (luceneDocument==EXIT_REQ) { > logger.debug("index exit > req received. exiting (2)"); > exit = true; > break; > } > } > > } > threadPool.shutdown(); > threadPool.awaitTermination(30,TimeUnit.MINUTES); > try { > pushbackLock.lock(); > if (pushbacks.size()>0) { > for (LuceneDocument pushback : > pushbacks) { > try { > > writer.addDocument(pushback.getDocument()); > } catch (IOException io) { > logger.error("failed to add > document to index:"+io.getMessage(),io); > } catch (AlreadyClosedException > e) { > pushbacks.add(pushback); > } > i++; > } > } > } finally { > pushbackLock.unlock(); > } > logger.debug("index commit"); > try { > if (writer!=null) { > writer.commit(); > } > } catch (Exception e) { > logger.error("failed to commit > index:"+e.getMessage(),e); > try { > readerLock.lock(); > closeIndex(); > openIndex(); > } finally { > readerLock.unlock(); > } > } > > } catch (Throwable ie) { > logger.error("index write > interrupted:"+ie.getMessage(),ie); > } finally { > > indexLock.unlock(); > } > } > logger.debug("exit indexer"); > } > > public class IndexDocument extends Thread { > > LuceneDocument luceneDocument = null; > List<LuceneDocument> pushbacks = null; > > public IndexDocument(LuceneDocument > luceneDocument,List<LuceneDocument> pushbacks) { > this.luceneDocument = luceneDocument; > this.pushbacks = pushbacks; > setName("index document"); > } > > public void run() { > try { > > writer.addDocument(luceneDocument.getDocument()); > } catch (IOException io) { > logger.error("failed to add document to > index:"+io.getMessage(),io); > } catch (AlreadyClosedException e) { > pushbacks.add(luceneDocument); > } catch (Throwable t) { > logger.error("failed to add document to > index:"+t.getMessage(),t); > } > }}; > } > > protected void closeIndex() { > try { > indexLock.lock(); > > > if (writer!=null) { > writer.close(); > } > > if (fsDirectory!=null) { > fsDirectory.close(); > } > } catch (Throwable io) { > logger.error("failed to close index > writer:"+io.getMessage(),io); > } finally { > writer = null; > indexLock.unlock(); > } > } > > > > public void optimize() throws MessageSearchException { > logger.debug("optimize volume"); > try { > indexLock.lock(); > try { > writer.optimize(false); > } catch (Exception io) { > throw new MessageSearchException("failed to > optimize the index:"+io.getMessage(),io,logger); > } > } catch (Throwable t) { // diskspace problems could arise > logger.error("failed to optimize > index:"+t.getMessage(),t); > } finally { > indexLock.unlock(); > } > > } > public void deleteDocs(Term[] terms) throws > MessageSearchException { > logger.debug("delete docs"); > if (status==Status.SHUTDOWN) { > throw new MessageSearchException("index is > shutdown.",logger); > } > try { > indexLock.lock(); > openIndex(); > try { > writer.deleteDocuments(terms); > } catch (Exception e) { > throw new MessageSearchException("failed to > delete doc from index:"+e.getMessage(),e,logger); > } finally { > try { > writer.commit(); > writer.expungeDeletes(false); > } catch (Exception io) { > throw new MessageSearchException("failed to > expunge docs from index:"+io.getMessage(),io,logger); > } > } > } catch (Throwable t) { > logger.error("failed to delete docs from > index."+t.getMessage(),t); > } finally { > indexLock.unlock(); > } > } > > > public void deleteIndex() throws MessageSearchException { > logger.debug("delete index > {indexpath='"+indexPath+"'}"); > try { > indexLock.lock(); > closeIndex(); > File indexFile = new File(indexPath); > //deleteDirContents(indexFile); > try { > int maxIndexChars = > Config.getConfig().getIndex().getMaxIndexPerFieldChars(); > writer = new > IndexWriter(FSDirectory.open(indexFile),analyzer,true,new > IndexWriter.MaxFieldLength(maxIndexChars)); > } catch (Throwable cie) { > logger.error("failed to delete index > {index='"+indexPath+"'}",cie); > return; > } finally { > try { writer.close(); } catch (Exception e) { > logger.debug("failed to close writer:"+e.getMessage()); } > writer = null; > } > } finally { > openIndex(); > indexLock.unlock(); > } > } > > public void startup() throws MessageSearchException { > logger.debug("luceneindex is starting up"); > > > File lockFile = new File(indexPath+File.separatorChar + > "write.lock"); > if (lockFile.exists()) { > if > (Config.getConfig().getIndex().getMultipleIndexProcesses()) { > logger.debug("index lock file detected on > volumeindex startup."); > } else { > logger.warn("index lock file detected. the server > was shutdown incorrectly. automatically deleting lock file. > {lockFile='"+lockFile.getPath()+"'}"); > lockFile.delete(); > } > } > openIndex(); > scheduler = > Executors.newScheduledThreadPool(1,ThreadUtil.getFlexibleThreadFactory("in > dex > reopen",Thread.NORM_PRIORITY-1,true)); > scheduledTask = scheduler.scheduleWithFixedDelay(new > Runnable() { public void run() { reopen=true; }},1,1,TimeUnit.SECONDS); > > indexProcessor = new IndexProcessor(); > indexProcessor.start(); > > > Runtime.getRuntime().addShutdownHook(this); > status = Status.READY; > } > > public IndexReader getReader() throws MessageSearchException { > if (status==Status.SHUTDOWN) { > throw new MessageSearchException("index is > shutdown.",logger); > } > readerLock.lock(); > try { > if (writer==null) { > throw new MessageSearchException("cannot retrieve > reader. writer is closed (or null)",logger); > } > if (reader == null) { > reader = new VolumeIndexReader(writer.getReader(5)); > } else { > try { > if (reopen) { > reader = new > VolumeIndexReader(writer.getReader(5)); > reopen = false; > } > } catch (AlreadyClosedException ace) { > logger.debug("reader was found closed. > reopening"); > reader = new > VolumeIndexReader(writer.getReader(5)); > } > } > } catch (IOException io) { > throw new MessageSearchException("failed to retrieve > reader from writer:"+io.getMessage(),io,logger); > } finally { > readerLock.unlock(); > } > return reader; > } > > > public void shutdown() { > status = Status.SHUTDOWN; > try { queue.put(EXIT_REQ); } catch (InterruptedException > e) {} > > if (reader!=null) { > try { > reader.close(); > } catch (Exception e) { > logger.error("failed to close index > reader:"+e.getMessage()); > } > } > reader = null; > if (scheduler!=null) { > scheduler.shutdown(); > } > closeIndex(); > > indexProcessor.interrupt(); > > if (scheduler!=null) { > scheduler.shutdownNow(); > } > > } > > @Override > public void run() { > shutdown(); > } > > > public interface LuceneDocument { > > public String toString(); > public Document getDocument(); > public void finalize(); > > } > > public static void deleteDirContents(File path) { > if( path.exists() ) { > File[] files = path.listFiles(); > for(int i=0; i<files.length; i++) { > if(files[i].isFile()) { > files[i].delete(); > } > } > } > } > > > > } > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org > For additional commands, e-mail: java-user-h...@lucene.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org For additional commands, e-mail: java-user-h...@lucene.apache.org