Are there other threads involved, besides the one hung in close?  Can
you post their stack traces?

This stack trace seems to indicate that IW believes another thread is
in the process of closing.

Can you call IndexWriter.setInfoStream and post the output leading to the hang?

Mike

On Fri, Oct 9, 2009 at 5:04 AM, Jamie Band <ja...@stimulussoft.com> wrote:
> HI Michael / Uwe / others
>
> Sorry for the repost... it just does not look like the earlier message I
> sent go through.
> FYI: there are no large Lucene merges taking place.
>
> Jamie Band wrote:
>>
>> Hi Michael
>>
>> Thanks for your help. Here are the stacks:
>>
>> index processor [TIME_WAITING] CPU time: 33:01
>> java.lang.Object.wait(long)
>> org.apache.lucene.index.IndexWriter.doWait()
>> org.apache.lucene.index.IndexWriter.shouldClose()
>> org.apache.lucene.index.IndexWriter.close(boolean)
>> org.apache.lucene.index.IndexWriter.close()
>> com.stimulus.archiva.index.VolumeIndex.closeIndex()
>> com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()
>>
>> The source code to our indexer is attached. As you can see, documents are
>> added to a blocking queue. The index processor thread takes it out of the
>> queue and processes it. After about 60k documents IndexWriter's close method
>> enters TIME_WAITING indefinitely. It there any workaround to this problem?
>>
>>
>> package com.stimulus.archiva.index;
>>
>> import java.io.File;
>> import java.io.IOException;
>> import java.io.PrintStream;
>> import javax.mail.MessagingException;
>> import org.apache.commons.logging.*;
>> import org.apache.lucene.document.Document;
>> import org.apache.lucene.index.*;
>> import org.apache.lucene.store.FSDirectory;
>> import com.stimulus.archiva.domain.Config;
>> import com.stimulus.archiva.domain.Email;
>> import com.stimulus.archiva.domain.EmailID;
>> import com.stimulus.archiva.domain.Indexer;
>> import com.stimulus.archiva.domain.Volume;
>> import com.stimulus.archiva.exception.*;
>> import com.stimulus.archiva.language.AnalyzerFactory;
>> import com.stimulus.archiva.search.*;
>> import java.util.*;
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.ScheduledExecutorService;
>> import java.util.concurrent.ScheduledFuture;
>> import java.util.concurrent.TimeUnit;
>> import org.apache.lucene.store.LockObtainFailedException;
>> import org.apache.lucene.store.AlreadyClosedException;
>> import java.util.concurrent.locks.ReentrantLock;
>> import java.util.concurrent.*;
>>
>> public class VolumeIndex extends Thread {
>>          protected ArrayBlockingQueue<IndexInfo> queue;
>>        protected static final Log logger =
>> LogFactory.getLog(VolumeIndex.class.getName());
>>           IndexWriter writer = null;
>>           Volume volume;
>>           protected static ScheduledExecutorService scheduler;
>>        protected static ScheduledFuture<?> scheduledTask;
>>        protected static IndexInfo EXIT_REQ = new IndexInfo(null);
>>        ReentrantLock indexLock = new ReentrantLock();
>>        ArchivaAnalyzer analyzer     = new ArchivaAnalyzer();
>>        Indexer indexer = null;
>>        File indexLogFile;
>>        PrintStream indexLogOut;
>>        IndexProcessor indexProcessor;
>>                         public VolumeIndex(Indexer indexer, Volume volume)
>> {
>>               logger.debug("creating new volume index {"+volume+"}");
>>               this.volume = volume;
>>               this.indexer = indexer;
>>               this.queue = new
>> ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog());
>>                           try {
>>                   indexLogFile = getIndexLogFile(volume);
>>                   if (indexLogFile!=null) {
>>                       if (indexLogFile.length()>10485760)
>>                           indexLogFile.delete();
>>                       indexLogOut = new PrintStream(indexLogFile);
>>                   }
>>                   logger.debug("set index log file path
>> {path='"+indexLogFile.getCanonicalPath()+"'}");
>>               } catch (Exception e) {
>>                   logger.error("failed to open index log
>> file:"+e.getMessage(),e);
>>               }
>>               startup();
>>           }
>>               protected File getIndexLogFile(Volume volume) {
>>              try {
>>                   String indexpath = volume.getIndexPath();
>>                   int lio = indexpath.lastIndexOf(File.separator)+1;
>>                   String logfilepath =
>> indexpath.substring(lio,indexpath.length()-1);
>>                   logfilepath += ".log";
>>                   logfilepath = "index_"+logfilepath;
>>                   logfilepath =
>> Config.getFileSystem().getLogPath()+File.separator+logfilepath;
>>                   return new File(logfilepath);
>>               } catch (Exception e) {
>>                   logger.error("failed to open index log
>> file:"+e.getMessage(),e);
>>                   return null;
>>               }
>>         }
>>                public void deleteMessages(List<String> ids) throws
>> MessageSearchException {
>>             if (ids == null)
>>                   throw new MessageSearchException("assertion failure:
>> null ids",logger);
>>                         Term[] terms = new Term[ids.size()];
>>             int c = 0;
>>             StringBuffer deleteInfo = new StringBuffer();
>>             for (String id : ids) {
>>                 terms[c++] = new Term("uid",id);
>>                 deleteInfo.append(id);
>>                 deleteInfo.append(",");
>>             }
>>                         String deleteStr = deleteInfo.toString();
>>             if (deleteStr.length()>0 &&
>> deleteStr.charAt(deleteStr.length()-1)==',')
>>                 deleteStr = deleteStr.substring(0,deleteStr.length()-1);
>>                         logger.debug("delete messages
>> {'"+deleteInfo+"'}");
>>             try {
>>                 indexLock.lock();
>>                 openIndex();
>>                 try {
>>                     writer.deleteDocuments(terms);
>>                     writer.expungeDeletes();
>>                 } catch (Exception e) {
>>                     throw new MessageSearchException("failed to delete
>> email from index.",e,logger);
>>                 } finally {
>>                                     }
>>             } finally {
>>                 closeIndex();
>>                 indexLock.unlock();
>>             }
>>       }
>>               protected void openIndex() throws MessageSearchException {
>>            Exception lastError = null;
>>                     if (writer==null) {
>>               logger.debug("openIndex() index will be opened. it is
>> currently closed.");
>>           } else {
>>               logger.debug("openIndex() did not bother opening index. it
>> is already open.");
>>               return;
>>           }
>>           logger.debug("opening index for write {"+volume+"}");
>>           indexer.prepareIndex(volume);
>>           logger.debug("opening search index for write
>> {indexpath='"+volume.getIndexPath()+"'}");
>>           boolean writelock;
>>           int attempt = 0;
>>           int maxattempt = 10;
>>                     if
>> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>>               maxattempt = 10000;
>>            } else {
>>               maxattempt = 10;
>>            }
>>                     do {
>>               writelock = false;
>>               try {
>>                       FSDirectory fsDirectory =
>> FSDirectory.getDirectory(volume.getIndexPath());
>>                       int maxIndexChars =
>> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>>                       writer = new IndexWriter(fsDirectory,analyzer,new
>> IndexWriter.MaxFieldLength(maxIndexChars));
>>                       if (logger.isDebugEnabled() && indexLogOut!=null) {
>>                           writer.setInfoStream(indexLogOut);
>>                       }
>>               } catch (LockObtainFailedException lobfe) {
>>                       logger.debug("write lock on index. will reopen in
>> 50ms.");
>>                       try { Thread.sleep(50); } catch (Exception e) {}
>>                       attempt++;
>>                       writelock = true;
>>               } catch (CorruptIndexException cie) {
>>                   throw new MessageSearchException("index appears to be
>> corrupt. please reindex the active volume."+cie.getMessage(),logger);
>>               } catch (IOException io) {
>>                   throw new MessageSearchException("failed to write
>> document to index:"+io.getMessage(),logger);
>>               }
>>          } while (writelock && attempt<maxattempt);
>>          if (attempt>=10000)
>>            throw new MessageSearchException("failed to open index writer
>> {location='"+volume.getIndexPath()+"'}",lastError,logger);
>>       }
>>             public void indexMessage(Email message) throws
>> MessageSearchException  {
>>           logger.debug("index message {"+message+"}");
>>           long s = (new Date()).getTime();
>>           if (message == null)
>>               throw new MessageSearchException("assertion failure: null
>> message",logger);
>>           Document doc = new Document();
>>           IndexInfo indexInfo = new IndexInfo(doc);
>>           try {
>>              DocumentIndex docIndex = new DocumentIndex(indexer);
>>              String language = doc.get("lang");
>>              if (language==null)
>>                  language = indexer.getIndexLanguage();
>>              docIndex.write(message,doc,indexInfo);
>>              queue.put(indexInfo);
>>              logger.debug("message indexed successfully
>> {"+message+",language='"+language+"'}");
>>           } catch (MessagingException me) {
>>              throw new MessageSearchException("failed to decode message
>> during indexing",me,logger, ChainedException.Level.DEBUG);
>>           } catch (IOException me) {
>>               throw new MessageSearchException("failed to index
>> message"+me.getMessage()+" {"+message+"}",me,logger,
>> ChainedException.Level.DEBUG);
>>           } catch (ExtractionException ee)
>>           {
>>               // we will want to continue indexing
>>              //throw new MessageSearchException("failed to decode
>> attachments in message {"+message+"}",ee,logger,
>> ChainedException.Level.DEBUG);
>>           } catch (AlreadyClosedException ace) {
>>               indexMessage(message);
>>           } catch (Throwable e) {
>>               throw new MessageSearchException("failed to index
>> message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
>>           }
>>           logger.debug("indexing message end {"+message+"}");
>>           long e = (new Date()).getTime();
>>           logger.debug("indexing time {time='"+(e-s)+"'}");
>>       }
>>         public class IndexProcessor extends Thread {
>>                     public IndexProcessor() {
>>               setName("index processor");
>>           }
>>               public void run() {
>>               boolean exit = false;
>>               //ExecutorService documentPool;
>>               // we abandoned pool as it does not seem to offer any major
>> performance benefit
>>               IndexInfo indexInfo = null;
>>  LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>();
>>                             while (!exit) {
>>                                     try {
>>                       int maxIndexDocs =
>> Config.getConfig().getIndex().getMaxSimultaneousDocs();
>>                       //documentPool =
>> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
>>                       indexInfo = null;                          indexInfo
>> = (IndexInfo) queue.take();
>>                       if (indexInfo==EXIT_REQ) {
>>                           logger.debug("index exit req received.
>> exiting");
>>                           exit = true;
>>                           continue;
>>                       }
>>                                         indexLock.lock();
>>                       try {
>>                            openIndex();
>>                       } catch (Exception e) {
>>                            logger.error("failed to open
>> index:"+e.getMessage(),e);
>>                            return;
>>                       }
>>                       if (indexInfo==null) {
>>                           logger.debug("index info is null");
>>                       }
>>                       int i = 0;
>>                       while(indexInfo!=null && i<maxIndexDocs) {
>>                           try {
>>                               writer.addDocument(indexInfo.getDocument());
>>                           } catch (IOException io) {
>>                               logger.error("failed to add document to
>> index:"+io.getMessage(),io);
>>                           } catch (AlreadyClosedException e) {
>>                               pushbacks.add(indexInfo);
>>                           } finally {
>>                               indexInfo.cleanup();
>>                           }
>>
>> //documentPool.execute(new IndexDocument(indexInfo,pushbacks));
>>                                                      i++;
>>                                                        if (i<maxIndexDocs)
>> {
>>                                indexInfo = (IndexInfo) queue.poll();
>>                                                                if
>> (indexInfo==null) {
>>                                       logger.debug("index info is null");
>>                                }
>>                                                                if
>> (indexInfo==EXIT_REQ) {
>>                                        logger.debug("index exit req
>> received. exiting (2)");
>>                                       exit = true;
>>                                       break;
>>                                 }
>>                            }
>>                                                   }
>>                                             for (IndexInfo pushback :
>> pushbacks) {
>>                           try {
>>                               writer.addDocument(pushback.getDocument());
>>                           } catch (IOException io) {
>>                               logger.error("failed to add document to
>> index:"+io.getMessage(),io);
>>                           } catch (AlreadyClosedException e) {
>>                               pushbacks.add(indexInfo);
>>                           } finally {
>>                               indexInfo.cleanup();
>>                           }
>>                           //documentPool.execute(new
>> IndexDocument(pushback,pushbacks));
>>                           i++;
>>                       }
>>                       //documentPool.shutdown();
>>
>> //documentPool.awaitTermination(30,TimeUnit.MINUTES);
>>                                          } catch (Throwable ie) {
>>                        logger.error("index write
>> interrupted:"+ie.getMessage());
>>                    } finally {
>>                          closeIndex();
>>                         indexLock.unlock();
>>                   }
>>               }                 }
>>                     public class IndexDocument extends Thread {
>>                                 IndexInfo indexInfo = null;
>>                   List<IndexInfo> pushbacks = null;
>>                                     public IndexDocument(IndexInfo
>> indexInfo,List<IndexInfo> pushbacks) {
>>                       this.indexInfo = indexInfo;
>>                       this.pushbacks = pushbacks;
>>                       setName("index document");
>>                   }
>>                                 public void run() {
>>                       try {
>>                           writer.addDocument(indexInfo.getDocument());
>>                       } catch (IOException io) {
>>                           logger.error("failed to add document to
>> index:"+io.getMessage(),io);
>>                       } catch (AlreadyClosedException e) {
>>                           pushbacks.add(indexInfo);
>>                       }
>>                   }};
>>           }
>>           protected void closeIndex() {
>>            try {
>>                    if (writer!=null) {
>>                       writer.close();
>>                       logger.debug("writer closed");
>>                       writer = null;
>>                    }
>>                } catch (Exception io) {
>>                   logger.error("failed to close index
>> writer:"+io.getMessage(),io);
>>                }
>>       }
>>           public void deleteIndex() throws MessageSearchException {
>>                  logger.debug("delete index
>> {indexpath='"+volume.getIndexPath()+"'}");
>>                  try {
>>                     indexLock.lock();
>>                    try {
>>                       int maxIndexChars =
>> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>>                       writer = new
>> IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new
>> IndexWriter.MaxFieldLength(maxIndexChars));
>>                    } catch (Exception cie) {
>>                        logger.error("failed to delete index
>> {index='"+volume.getIndexPath()+"'}",cie);
>>                        return;
>>                    }
>>                    MessageIndex.volumeIndexes.remove(this);
>>                 } finally {
>>                       closeIndex();
>>                     indexLock.unlock();
>>               }
>>         }
>>               public void startup() {
>>           logger.debug("volumeindex is starting up");
>>           File lockFile = new
>> File(volume.getIndexPath()+File.separatorChar + "write.lock");
>>           if (lockFile.exists()) {
>>               logger.warn("The server lock file already exists. Either
>> another indexer is running or the server was not shutdown correctly.");
>>               logger.warn("If it is the latter, the lock file must be
>> manually deleted at "+lockFile.getAbsolutePath());
>>               if (indexer.getMultipleIndexProcesses()) {
>>                   logger.debug("index lock file detected on volumeindex
>> startup.");
>>               } else {
>>                   logger.warn("index lock file detected. the server was
>> shutdown incorrectly. automatically deleting lock file.");
>>                   logger.warn("indexer is configured to deal with only one
>> indexer process.");
>>                   logger.warn("if you are running more than one indexer,
>> your index could be subject to corruption.");
>>                   lockFile.delete();
>>               }
>>           }
>>           indexProcessor = new IndexProcessor();
>>           indexProcessor.start();
>>           Runtime.getRuntime().addShutdownHook(this);
>>                   }
>>                                 public void shutdown() {
>>             logger.debug("volumeindex is shutting down");
>>             queue.add(EXIT_REQ);
>>             scheduler.shutdownNow();
>>                    }
>>                 @Override
>>         public void run() {
>>             queue.add(EXIT_REQ);
>>         }
>>            }
>>
>>
>>
>> Is it possible a large merge is running?  By default IW.close waits
>> for outstanding merges to complete.  Can you post the stacktrace?
>>
>> Mike
>>
>> On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <ja...@stimulussoft.com> wrote:
>>>
>>> Hi All
>>>
>>> I have a long running situation where our indexing thread is getting
>>> stuck
>>> indefinitely in IndexWriter's close method. Yourkit shows the thread to
>>> be
>>> stuck in TIME_WAITING. Any idea's on what could be causing this?
>>> Could it be one of the streams or readers we passed to the document?
>>>
>>> I am running Lucene 2.9.0.
>>>
>>> Many thanks in advance
>>>
>>> Jamie
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
> For additional commands, e-mail: java-user-h...@lucene.apache.org
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org
For additional commands, e-mail: java-user-h...@lucene.apache.org

Reply via email to