Ram has uploaded a new change for review. https://gerrit.wikimedia.org/r/51077
Change subject: Fix for bug 45266. Needs parallel changes to OAI. ...................................................................... Fix for bug 45266. Needs parallel changes to OAI. See change id: I07d5c2cedcd7550505b53d380d13111bd83e3216 in OAI extension. Change-Id: I31bde27a7a64e0d9fff340843f56fe6c6d8a322a --- M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java M src/org/wikimedia/lsearch/oai/OAIHarvester.java 2 files changed, 130 insertions(+), 20 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/debs/lucene-search-2 refs/changes/77/51077/1 diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java index 6028fa2..4156977 100755 --- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java +++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java @@ -85,6 +85,14 @@ * @param args */ public static void main(String[] args){ + // only for debugging: allows use of a different log file from the main process + for (int i =0; i < args.length; i++) { + if (args[i].equals("-configfile")) { + Configuration.setConfigFile(args[++i]); + break; + } + } + // config Configuration config = Configuration.open(); GlobalConfiguration global = GlobalConfiguration.getInstance(); @@ -105,7 +113,8 @@ boolean requestSnapshot = false; String noOptimizationDBlistFile = null; HashSet<String> noOptimizationDBs = new HashSet<String>(); - + String seq_first = null; // first record to fetch + // args for(int i=0; i<args.length; i++){ if(args[i].equals("-d")) @@ -130,8 +139,12 @@ requestSnapshot = true; else if(args[i].equals("-nof")) noOptimizationDBlistFile = args[++i]; + else if(args[i].equals("-q")) + seq_first = args[++i]; else if(args[i].equals("--help")) break; + else if(args[i].equals("-configfile")) + ++i; // skip argument else if(args[i].startsWith("-")){ System.out.println("Unrecognized switch "+args[i]); return; @@ -159,6 +172,7 @@ System.out.println(" -ef - exclude db names listed in dblist file"); System.out.println(" -sn - immediately make unoptimized snapshot as updates finish "); System.out.println(" -nof - use with -sn to specify a file with databases not to be optimized"); + System.out.println(" -q - sequence number to start from"); return; } // preload @@ -171,13 +185,19 @@ maxQueueSize = config.getInt("OAI","maxqueue",500); bufferDocs = config.getInt("OAI","bufferdocs",50); + + bufferDocs = 2; // for debugging, remove later + + log.trace( String.format( "maxQueueSize = %d, bufferDocs = %d\n", maxQueueSize, bufferDocs ) ); firstPass.addAll(dbnames); // update do{ main_loop: for(String dbname : dbnames){ try{ - if(excludeList.contains(dbname)) - continue; + if(excludeList.contains(dbname)) { + log.trace( String.format( "%s in excludeList, skipped\n", dbname ) ); + continue; + } IndexId iid = IndexId.get(dbname); OAIHarvester harvester = new OAIHarvester(iid,iid.getOAIRepository(),auth); OAIHarvester harvesterSingle = new OAIHarvester(iid,iid.getOAIRepository(),auth); @@ -193,15 +213,38 @@ } catch (IOException e) { log.warn("I/O error reading status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e); } - String from; - if(firstPass.contains(dbname) && timestamp!=null) - from = timestamp; - else - from = status.getProperty("timestamp",defaultTimestamp); - log.info("Resuming update of "+iid+" from "+from); - ArrayList<IndexUpdateRecord> records = harvester.getRecords(from,bufferDocs); - if(records.size() == 0) - continue; + + // fetch next batch of records based on sequence number (new scheme) or + // timestamp (new scheme) + // + String from = null, + seq_next = null; + ArrayList<IndexUpdateRecord> records = null; + if ( firstPass.contains( dbname ) ) { + if ( null != seq_first ) { + seq_next = seq_first; + } else if ( null != timestamp ) { + from = timestamp; + } + } + if ( null == seq_next && null == from ) { + seq_next = status.getProperty( "sequence" ); + if ( null == seq_next ) { + from = status.getProperty("timestamp",defaultTimestamp); + } + } + if ( null != seq_next ) { // working with sequence numbers + log.info( "Resuming update of "+ iid + ", seq = " + seq_next ); + records = harvester.getRecordsSeq( seq_next, bufferDocs ); + } else { // working with timestamps + log.info( "Resuming update of " + iid + ", from = " + from ); + records = harvester.getRecords( from, bufferDocs ); + } + + if(records.size() == 0) { + log.trace( String.format( "No records\n" ) ); + continue; + } boolean hasMore = false; do{ // send to indexer @@ -234,14 +277,21 @@ if(hasMore){ log.info("Fetching more records..."); records = harvester.getMoreRecords(bufferDocs); + + if(records.size() == 0) { + log.trace( String.format( "Unexpected: hasMore is true but no records\n" ) ); + break; + } } } while(hasMore); // see if we need to wait for notification + log.trace( String.format( "notification = %s\n", notification ) ); if(notification){ RMIMessengerClient messenger = new RMIMessengerClient(true); String host = iid.getIndexHost(); boolean req = messenger.requestFlushAndNotify(dbname,host); + log.trace( String.format( "req = %s\n", req ) ); if(req){ log.info("Waiting for flush notification for "+dbname); Boolean succ = null; @@ -278,14 +328,16 @@ continue main_loop; } - // write updated timestamp - status.setProperty("timestamp",harvester.getResponseDate()); + // write updated sequence number (timestamp not needed) + status.setProperty( "sequence", harvester.getSequence() ); + status.setProperty( "timestamp", harvester.getResponseDate() ); try { if(!statf.exists()) statf.getParentFile().mkdirs(); FileOutputStream fileos = new FileOutputStream(statf,false); status.store(fileos,"Last incremental update timestamp"); fileos.close(); + log.trace( String.format( "stored sequence to status file\n" ) ); } catch (IOException e) { log.warn("I/O error writing status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e); } @@ -349,4 +401,4 @@ } -} \ No newline at end of file +} diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java b/src/org/wikimedia/lsearch/oai/OAIHarvester.java index 63ac868..ce11d72 100755 --- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java +++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java @@ -1,8 +1,13 @@ package org.wikimedia.lsearch.oai; import java.io.BufferedInputStream; -import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.InputStream; +import java.io.IOException; + import java.net.Authenticator; import java.net.MalformedURLException; import java.net.URL; @@ -32,6 +37,30 @@ /** number of retries before giving up, useful when there are broken servers in the cluster */ protected int retries = 5; + // for debugging + // save contents of input stream to memory stream and dump to file + private static final boolean DBG = true; + private static int fnum = 1; + public static InputStream toMem( InputStream is ) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + // open new dump file + File dir = new File( "/var/tmp" ); + String + pfx = String.format( "oai_%d_", fnum++ ), + sfx = ".xml"; + File dumpfile = File.createTempFile( pfx, sfx, dir ); + FileOutputStream fos = new FileOutputStream( dumpfile ); + + // read bytes + int c; + while ( -1 != (c = is.read()) ) { + os.write( c ); fos.write( c ); + } + fos.close(); + ByteArrayInputStream bis = new ByteArrayInputStream( os.toByteArray() ); + return bis; + } // toMem + public OAIHarvester(IndexId iid, String url, Authenticator auth) throws MalformedURLException{ this.urlbase = url; this.iid = iid; @@ -70,12 +99,19 @@ // set some timeouts urlConn.setReadTimeout(60 * 1000); // 60 seconds urlConn.setConnectTimeout(60 * 1000); // 60 seconds - InputStream in = new BufferedInputStream(urlConn.getInputStream()); + InputStream in; + if ( ! DBG ) { + in = new BufferedInputStream( urlConn.getInputStream() ); + } else { + in = toMem( urlConn.getInputStream() ); + } + parser = new OAIParser(in,collector); parser.parse(); resumptionToken = parser.getResumptionToken(); responseDate = parser.getResponseDate(); in.close(); + log.trace( String.format( "resumptionToken = %s, responseDate = %s", resumptionToken, responseDate ) ); break; } catch(IOException e){ if(tryNum == this.retries) @@ -91,8 +127,9 @@ ArrayList<IndexUpdateRecord> ret = new ArrayList<IndexUpdateRecord>(); try{ do{ - read(new URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken)); - ret.addAll(collector.getRecords()); + URL url = new URL( urlbase + "&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + getSequence() ); + read( url ); + ret.addAll(collector.getRecords()); } while(hasMore() && ret.size() < atLeast); } catch(IOException e){ log.warn("I/O exception listing records: "+e.getMessage(),e); @@ -101,8 +138,29 @@ return ret; } + /** Invoke ListRecords using the last resumption token, get atLeast num of records */ + public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int atLeast ) { + ArrayList<IndexUpdateRecord> ret = new ArrayList<IndexUpdateRecord>(); + try{ + do{ + read( new URL( urlbase + "&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + seq ) ); + ret.addAll( collector.getRecords() ); + } while( hasMore() && ret.size() < atLeast ); + } catch( IOException e ){ + log.warn( "I/O exception listing records: " + e.getMessage(), e ); + return null; + } + return ret; + } + public boolean hasMore(){ - return !resumptionToken.equals(""); + return resumptionToken.endsWith(":"); + } + + public String getSequence(){ + return resumptionToken.endsWith(":") + ? resumptionToken.substring( 0, resumptionToken.length() - 1 ) + : resumptionToken; } public String getResponseDate(){ -- To view, visit https://gerrit.wikimedia.org/r/51077 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I31bde27a7a64e0d9fff340843f56fe6c6d8a322a Gerrit-PatchSet: 1 Gerrit-Project: operations/debs/lucene-search-2 Gerrit-Branch: master Gerrit-Owner: Ram <r...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits