Ram has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/51077


Change subject: Fix for bug 45266. Needs parallel changes to OAI.
......................................................................

Fix for bug 45266. Needs parallel changes to OAI.

See change id: I07d5c2cedcd7550505b53d380d13111bd83e3216
in OAI extension.

Change-Id: I31bde27a7a64e0d9fff340843f56fe6c6d8a322a
---
M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
M src/org/wikimedia/lsearch/oai/OAIHarvester.java
2 files changed, 130 insertions(+), 20 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/debs/lucene-search-2 
refs/changes/77/51077/1

diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java 
b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
index 6028fa2..4156977 100755
--- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
+++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
@@ -85,6 +85,14 @@
         * @param args
         */
        public static void main(String[] args){
+                // only for debugging: allows use of a different log file from 
the main process
+               for (int i =0; i < args.length; i++) {
+                       if (args[i].equals("-configfile")) {
+                               Configuration.setConfigFile(args[++i]);
+                                break;
+                       }
+               }
+
                // config
                Configuration config = Configuration.open();
                GlobalConfiguration global = GlobalConfiguration.getInstance();
@@ -105,7 +113,8 @@
                boolean requestSnapshot = false;
                String noOptimizationDBlistFile = null;
                HashSet<String> noOptimizationDBs = new HashSet<String>();
-               
+                String seq_first = null;    // first record to fetch
+
                // args
                for(int i=0; i<args.length; i++){
                        if(args[i].equals("-d"))
@@ -130,8 +139,12 @@
                                requestSnapshot = true;
                        else if(args[i].equals("-nof"))
                                noOptimizationDBlistFile = args[++i];
+                       else if(args[i].equals("-q"))
+                               seq_first = args[++i];
                        else if(args[i].equals("--help"))
                                break;
+                       else if(args[i].equals("-configfile"))
+                            ++i;  // skip argument
                        else if(args[i].startsWith("-")){
                                System.out.println("Unrecognized switch 
"+args[i]);
                                return;
@@ -159,6 +172,7 @@
                        System.out.println("  -ef  - exclude db names listed in 
dblist file");
                        System.out.println("  -sn  - immediately make 
unoptimized snapshot as updates finish ");
                        System.out.println("  -nof - use with -sn to specify a 
file with databases not to be optimized");
+                       System.out.println("  -q   - sequence number to start 
from");
                        return;
                }
                // preload
@@ -171,13 +185,19 @@
                
                maxQueueSize = config.getInt("OAI","maxqueue",500);
                bufferDocs = config.getInt("OAI","bufferdocs",50);
+
+                bufferDocs = 2; // for debugging, remove later
+
+                log.trace( String.format( "maxQueueSize = %d, bufferDocs = 
%d\n", maxQueueSize, bufferDocs ) );
                firstPass.addAll(dbnames);
                // update
                do{
                        main_loop: for(String dbname : dbnames){
                                try{
-                                       if(excludeList.contains(dbname))
-                                               continue;
+                                    if(excludeList.contains(dbname)) {
+                                        log.trace( String.format( "%s in 
excludeList, skipped\n", dbname ) );
+                                        continue;
+                                    }
                                        IndexId iid = IndexId.get(dbname);
                                        OAIHarvester harvester = new 
OAIHarvester(iid,iid.getOAIRepository(),auth);
                                        OAIHarvester harvesterSingle = new 
OAIHarvester(iid,iid.getOAIRepository(),auth);
@@ -193,15 +213,38 @@
                                        } catch (IOException e) {
                                                log.warn("I/O error reading 
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
                                        }                               
-                                       String from;
-                                       if(firstPass.contains(dbname) && 
timestamp!=null)
-                                               from = timestamp;
-                                       else
-                                               from = 
status.getProperty("timestamp",defaultTimestamp);
-                                       log.info("Resuming update of "+iid+" 
from "+from);
-                                       ArrayList<IndexUpdateRecord> records = 
harvester.getRecords(from,bufferDocs);
-                                       if(records.size() == 0)
-                                               continue;
+
+                                        // fetch next batch of records based 
on sequence number (new scheme) or
+                                        // timestamp (new scheme)
+                                        //
+                                       String from = null,
+                                               seq_next = null;
+                                       ArrayList<IndexUpdateRecord> records = 
null;
+                                       if ( firstPass.contains( dbname ) ) {
+                                            if ( null != seq_first ) {
+                                                seq_next = seq_first;
+                                            } else if ( null != timestamp ) {
+                                                from = timestamp;
+                                            }
+                                        }
+                                        if ( null == seq_next && null == from 
) {
+                                            seq_next = status.getProperty( 
"sequence" );
+                                            if ( null == seq_next ) {
+                                                from = 
status.getProperty("timestamp",defaultTimestamp);
+                                            }
+                                        }
+                                        if ( null != seq_next ) {  // working 
with sequence numbers
+                                            log.info( "Resuming update of "+ 
iid + ", seq = " + seq_next );
+                                            records = harvester.getRecordsSeq( 
seq_next, bufferDocs );
+                                        } else {              // working with 
timestamps
+                                            log.info( "Resuming update of " + 
iid + ", from = " + from );
+                                            records = harvester.getRecords( 
from, bufferDocs );
+                                        }
+
+                                       if(records.size() == 0) {
+                                            log.trace( String.format( "No 
records\n" ) );
+                                            continue;
+                                        }
                                        boolean hasMore = false;
                                        do{
                                                // send to indexer
@@ -234,14 +277,21 @@
                                                if(hasMore){
                                                        log.info("Fetching more 
records...");
                                                        records = 
harvester.getMoreRecords(bufferDocs);
+
+                                                        if(records.size() == 
0) {
+                                                            log.trace( 
String.format( "Unexpected: hasMore is true but no records\n" ) );
+                                                            break;
+                                                        }
                                                }
                                        } while(hasMore);
 
                                        // see if we need to wait for 
notification
+                                        log.trace( String.format( 
"notification = %s\n", notification ) );
                                        if(notification){
                                                RMIMessengerClient messenger = 
new RMIMessengerClient(true);
                                                String host = 
iid.getIndexHost();
                                                boolean req = 
messenger.requestFlushAndNotify(dbname,host);
+                                                log.trace( String.format( "req 
= %s\n", req ) );
                                                if(req){
                                                        log.info("Waiting for 
flush notification for "+dbname);
                                                        Boolean succ = null;
@@ -278,14 +328,16 @@
                                                        continue main_loop;
                                        }
                                        
-                                       // write updated timestamp
-                                       
status.setProperty("timestamp",harvester.getResponseDate());
+                                       // write updated sequence number 
(timestamp not needed)
+                                       status.setProperty( "sequence", 
harvester.getSequence() );
+                                        status.setProperty( "timestamp", 
harvester.getResponseDate() );
                                        try {
                                                if(!statf.exists())
                                                        
statf.getParentFile().mkdirs();
                                                FileOutputStream fileos = new 
FileOutputStream(statf,false);
                                                status.store(fileos,"Last 
incremental update timestamp");
                                                fileos.close();
+                                                log.trace( String.format( 
"stored sequence to status file\n" ) );
                                        } catch (IOException e) {
                                                log.warn("I/O error writing 
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
                                        }
@@ -349,4 +401,4 @@
 
        }
        
-}
\ No newline at end of file
+}
diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java 
b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
index 63ac868..ce11d72 100755
--- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
+++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
@@ -1,8 +1,13 @@
 package org.wikimedia.lsearch.oai;
 
 import java.io.BufferedInputStream;
-import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.InputStream;
+import java.io.IOException;
+
 import java.net.Authenticator;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -32,6 +37,30 @@
        /** number of retries before giving up, useful when there are broken 
servers in the cluster */
        protected int retries = 5;
        
+        // for debugging
+        // save contents of input stream to memory stream and dump to file
+        private static final boolean DBG = true;
+        private static int fnum = 1;
+        public static InputStream toMem( InputStream is ) throws IOException {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            // open new dump file
+            File dir = new File( "/var/tmp" );
+            String
+                pfx = String.format( "oai_%d_", fnum++ ),
+                sfx = ".xml";
+            File dumpfile = File.createTempFile( pfx, sfx, dir );
+            FileOutputStream fos = new FileOutputStream( dumpfile );
+
+            // read bytes
+            int c;
+            while ( -1 != (c = is.read()) ) {
+                os.write( c ); fos.write( c );
+            }
+            fos.close();
+            ByteArrayInputStream bis = new ByteArrayInputStream( 
os.toByteArray() );
+            return bis;
+        }  // toMem
+
        public OAIHarvester(IndexId iid, String url, Authenticator auth) throws 
MalformedURLException{
                this.urlbase = url;
                this.iid = iid;
@@ -70,12 +99,19 @@
                                // set some timeouts
                                urlConn.setReadTimeout(60 * 1000); // 60 seconds
                                urlConn.setConnectTimeout(60 * 1000); // 60 
seconds
-                               InputStream in = new 
BufferedInputStream(urlConn.getInputStream());
+                                InputStream in;
+                                if ( ! DBG ) {
+                                    in = new BufferedInputStream( 
urlConn.getInputStream() );
+                                } else {
+                                    in = toMem( urlConn.getInputStream() );
+                                }
+
                                parser = new OAIParser(in,collector);
                                parser.parse();
                                resumptionToken = parser.getResumptionToken();
                                responseDate = parser.getResponseDate();
                                in.close();
+                                log.trace( String.format( "resumptionToken = 
%s, responseDate = %s", resumptionToken, responseDate ) );
                                break;
                        } catch(IOException e){                         
                                if(tryNum == this.retries)
@@ -91,8 +127,9 @@
                ArrayList<IndexUpdateRecord> ret = new 
ArrayList<IndexUpdateRecord>();
                try{                    
                        do{
-                               read(new 
URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken));
-                               ret.addAll(collector.getRecords());
+                            URL url = new URL( urlbase + 
"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + getSequence() );
+                            read( url );
+                           ret.addAll(collector.getRecords());
                        } while(hasMore() && ret.size() < atLeast);
                } catch(IOException e){
                        log.warn("I/O exception listing records: 
"+e.getMessage(),e);
@@ -101,8 +138,29 @@
                return ret;
        }
        
+       /** Invoke ListRecords using the last resumption token, get atLeast num 
of records */
+        public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int 
atLeast ) {
+               ArrayList<IndexUpdateRecord> ret = new 
ArrayList<IndexUpdateRecord>();
+               try{                    
+                       do{
+                               read( new URL( urlbase + 
"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + seq ) );
+                               ret.addAll( collector.getRecords() );
+                       } while( hasMore() && ret.size() < atLeast );
+               } catch( IOException e ){
+                       log.warn( "I/O exception listing records: " + 
e.getMessage(), e );
+                       return null;
+               }
+               return ret;
+       }
+       
        public boolean hasMore(){
-               return !resumptionToken.equals("");
+               return resumptionToken.endsWith(":");
+       }
+       
+       public String getSequence(){
+            return resumptionToken.endsWith(":")
+                ? resumptionToken.substring( 0, resumptionToken.length() - 1 )
+                : resumptionToken;
        }
        
        public String getResponseDate(){

-- 
To view, visit https://gerrit.wikimedia.org/r/51077
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I31bde27a7a64e0d9fff340843f56fe6c6d8a322a
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/lucene-search-2
Gerrit-Branch: master
Gerrit-Owner: Ram <r...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to