Kumaran,
Below is the code snippet for concurrent writes (i.e. concurrent
updates/deletes etc.) alongwith Search operation using the NRT Manger APIs.
Let me know if you need any other details or have any suggesstion for me :-
public class LuceneEngineInstance implements IndexEngineInstance
{
private final String indexName;
private final String indexBaseDir;
private IndexWriter indexWriter;
private Directory luceneDirectory;
private TrackingIndexWriter trackingIndexWriter;
private ReferenceManager<IndexSearcher> indexSearcherReferenceManager;
// Note : Note that this will only scale well if most searches do not
need to wait for a specific index generation.
private final ControlledRealTimeReopenThread<IndexSearcher>
indexSearcherReopenThread;
private long reopenToken; // index update/delete methods returned token
private static final Log log =
LogFactory.getLog(LuceneEngineInstance.class);
private static final String VERBOSE = "NO"; // read from property file
////////// CONSTRUCTOR & FINALIZE
/**
* Constructor based on an instance of the type responsible of the lucene
index persistence
* @param indexName
*/
public LuceneEngineInstance(Directory luceneDirectory, final
IndexWriterConfig writerConfig, final String indexName, final String
indexBaseDir)
{
this.indexName = indexName;
this.indexBaseDir = indexBaseDir;
this.luceneDirectory = luceneDirectory;
try
{
// [1]: Create the indexWriter
if ("YES".equalsIgnoreCase(VERBOSE))
{
writerConfig.setInfoStream(System.out);
}
indexWriter = new IndexWriter(luceneDirectory, writerConfig);
// [2a]: Create the TrackingIndexWriter to track changes to the
delegated previously created IndexWriter
trackingIndexWriter = new TrackingIndexWriter(indexWriter);
// [2b]: Create an IndexSearcher ReferenceManager to safelly share
IndexSearcher instances across multiple threads
// Note : applyAllDeletes=true : means each reopened reader is
required to apply all previous deletion operations
// (deleteDocuments or updateDocument/s) up until that point.
indexSearcherReferenceManager = new SearcherManager(indexWriter,
true, null);
// [3]: Create the ControlledRealTimeReopenThread that reopens the
index periodically having into
// account the changes made to the index and tracked by the
TrackingIndexWriter instance
// The index is refreshed every 60sc when nobody is waiting
// and every 100 millis whenever is someone waiting (see search
method)
indexSearcherReopenThread = new
ControlledRealTimeReopenThread<IndexSearcher>(trackingIndexWriter,
indexSearcherReferenceManager, 60.00, // when there is nobody waiting
0.1); // when there is someone waiting
indexSearcherReopenThread.start(); // start the refresher thread
}
catch (IOException ioEx)
{
throw new IllegalStateException("Lucene index could not be created
for : " + indexName, ioEx);
}
}
////////// INDEX
@Override
public void indexDocWithoutCommit(final Document doc)
{
Monitor addDocumentMonitor =
MonitorFactory.start("SearchIndex.addDocument");
try
{
reopenToken = trackingIndexWriter.addDocument(doc);
// if (log.istraceEnabled())
// {
log.trace("document added in the Index, doc : " + doc);
// }
}
catch (IOException ioEx)
{
log.error("Error while adding the doc: " + doc, ioEx);
}
finally
{
addDocumentMonitor.stop();
}
}
@Override
public void commitDocuments()
{
Monitor indexCommitMonitor = MonitorFactory.start("SearchIndex.commit");
try
{
indexWriter.commit();
}
catch (IOException ioEx)
{
// throw new IndexerException(" Error while commiting changes to
Lucene inddex for : " + _indexName, ioEx);
try
{
log.warn("Trying rollback. Error while commiting changes to Lucene
inddex for : " + indexName, ioEx);
indexWriter.rollback(); // TODO: handle roll back records
}
catch (IOException ioe)
{
log.error("Error while roll back process.", ioe);
}
}
finally
{
indexCommitMonitor.stop();
}
}
@Override
public void reIndexDocWithoutCommit(final Term recordIdTerm, final
Document doc)
{
Monitor updateMonitor = MonitorFactory.start("SearchIndex.updateDoc");
try
{
reopenToken = trackingIndexWriter.updateDocument(recordIdTerm, doc);
log.trace("document re-indexed in lucene : " + recordIdTerm.text());
}
catch (IOException ioEx)
{
log.error("Error while updating the doc: " + doc, ioEx);
}
finally
{
updateMonitor.stop();
}
}
/* (non-Javadoc)
* @see
com.pb.spectrum.component.index.lucene.IndexInstanceEngine#unIndex(org.apache.lucene.index.Term)
*/
@Override
public void unIndexDocWithoutCommit(final Term idTerm) throws
IndexerException
{
try
{
reopenToken = trackingIndexWriter.deleteDocuments(idTerm);
log.trace("Term matching records un-indexed from lucene" +
idTerm.field() + " : " + idTerm.text());
}
catch (IOException ioEx)
{
// log.error("Error in un-index lucene operation.", ioEx);
throw new IndexerException("Error in un-index lucene operation.",
ioEx);
}
}
/* (non-Javadoc)
* @see
com.pb.spectrum.component.index.lucene.IndexInstanceEngine#search(org.apache.lucene.search.Query,
java.util.Set, int)
*/
@Override
public List<Document> search(final Query qry, Set<SortField> sortFields,
final int maxHits) throws SearcherException
{
List<Document> outputDataMapList = new ArrayList<Document>();
try
{
Monitor searchOperationMonitor =
MonitorFactory.start("SearchIndex.searchOperation");
// Note : waitForGeneration(_reopenToken) to be certain that
IndexSearcher will wait for deletes/updates.
// Hence it will only scale well if most searches do not need to wait
for a specific index generation.
// If the latest searcher already covers the requested generation,
the method returns immediately.
// Otherwise, it blocks, requesting a reopen (see below), until the
required generation has become
// visible in a searcher, and then returns it.
indexSearcherReopenThread.waitForGeneration(reopenToken); // wait
until the index is re-opened for the last update
IndexSearcher searcher = indexSearcherReferenceManager.acquire();
try
{
//create top score document collector
TopScoreDocCollector collector =
TopScoreDocCollector.create(maxHits, true);
//TODO: implement monitor to collect performance figures
try
{
searcher.search(qry, collector);
}
catch (IOException e)
{
throw new SearcherException("Error while Search", e);
}
ScoreDoc[] hits = collector.topDocs().scoreDocs;
//check whether the search returns any result
if (hits.length > 0)
{
//iterate through the collection and display result
for (int i = 0; i < hits.length; i++)
{
Document document = null;
try
{
// get the document
document = searcher.doc(hits[i].doc);
}
catch (IOException e)
{
// e.printStackTrace();
log.error(e.getMessage());
}
// Fill search results in output data map.
outputDataMapList.add(document);
}
}
}
finally
{
indexSearcherReferenceManager.release(searcher);
searchOperationMonitor.stop();
}
}
catch (IOException ioEx)
{
log.error("Error freeing the searcher.", ioEx);
}
catch (InterruptedException intEx)
{
log.error("The index writer periodically re-open thread has stopped",
intEx);
}
return outputDataMapList;
}
...
On Tue, Aug 5, 2014 at 12:51 PM, Kumaran Ramasubramanian <[email protected]
> wrote:
> Hi Gaurav
>
> Thanks for the clarification. If possible, please share your NRT
> manager API related code example. i believe, it will help me to understand
> little better.
>
>
> -
> Kumaran R
>
> On Tue, Aug 5, 2014 at 12:39 PM, Gaurav gupta <[email protected]>
> wrote:
>
> > Thanks Kumaran and Erik for resolving my queries.
> >
> > Kumaran,
> > You are right at only one indexwriter can write as it acquire the lock
> but
> > using the NRT manager APis -
> > TrackingIndexWriter
> > <
> >
> http://lucene.apache.org/core/4_4_0/core/org/apache/lucene/index/TrackingIndexWriter.html
> > >
> > multiple
> > concurrent updates/delete/append is possible.
> >
> > Thanks again !
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Aug 4, 2014 at 10:29 PM, Erick Erickson <[email protected]
> >
> > wrote:
> >
> > > Right.
> > > 1> Occasionally the merge will require 2x the disk space. (3x in
> compound
> > > file system). The merging is, indeed, done in the background, it is
> NOT a
> > > blocking operation.
> > >
> > > 2> n/a. It shouldn't block at all.
> > >
> > > Here's a cool video by Mike McCandless on the merging process, plus
> some
> > > explanations:
> > >
> > >
> > >
> >
> http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html
> > >
> > > Best,
> > > Erick
> > >
> > >
> > >
> > >
> > > On Mon, Aug 4, 2014 at 8:45 AM, Kumaran R <[email protected]> wrote:
> > >
> > > > Hi Gaurav
> > > >
> > > > 1.When you opened index to write,till you close that index, there
> will
> > > > be a lock to do further write. But not for search. During merge,
> index
> > > > needs 3X ( not sure 2X?) of more storage space, i believe that is the
> > > > reason for no blocking for search. ( any other experts can clarify
> you
> > > > more on this )
> > > >
> > > > 2. Merge will be taken care by default values( merge factor 2) of
> > > > lucene. If u need to control more on merge policy, please go through
> > > > about merge by size or by number of segments or many merge policies.
> > > >
> > > >
> > > > Hope this will help you a little bit.
> > > >
> > > > --
> > > > Kumaran R
> > > > Sent from Phone
> > > >
> > > > > On 04-Aug-2014, at 8:04 pm, Gaurav gupta <
> [email protected]
> > >
> > > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > We are planning to use Lucene 4.8.1 over Oracle (1 to 2 TB data)
> and
> > > > > seeking information on "How Lucene conduct housekeeping or
> > maintenance
> > > > of
> > > > > indexes over a period of time". *Is it a blocking operation for
> write
> > > and
> > > > > search or it will not block anything while merging is going on? *
> > > > >
> > > > > I found :- *"Since Lucene adds the updated document to the index
> and
> > > > marks
> > > > > all previous versions as deleted. So to get rid of deleted
> documents
> > > > Lucene
> > > > > needs to do some housekeeping over a period of time. Under the hood
> > is
> > > > that
> > > > > from time to time segments are merged into (usually) bigger
> segments
> > > > > using configurable MergePolicy
> > > > > <
> > > >
> > >
> >
> http://lucene.apache.org/java/3_4_0/api/core/org/apache/lucene/index/MergePolicy
> > > > >
> > > > > (TieredMergePolicy).
> > > > > "*
> > > > >
> > > > > 1- Is it's a blocking operation for write and search both or it
> will
> > > not
> > > > > block anything while merging is going on?
> > > > >
> > > > > 2- What is the best practice to avoid any blocking in production
> > > servers?
> > > > > Not sure how Solr or Elasticsearch is handling it.
> > > > > Should we control the merging by calling *forcemerge(int) at low
> > > traffic
> > > > > time *to avoid any unpredictable blocking operation? Is it
> > recommended
> > > or
> > > > > Lucene do intelligent merging and don't block anything (updates and
> > > > > searches) or there are ways to reduce the blocking time to a very
> > small
> > > > > duration (1 -2 minutes) using some API or demon thread etc.
> > > > >
> > > > > Looking for your professional guidance on it.
> > > > >
> > > > > Regards
> > > > > Gaurav
> > > >
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail: [email protected]
> > > > For additional commands, e-mail: [email protected]
> > > >
> > > >
> > >
> >
>