Demon has submitted this change and it was merged.

Change subject: Revert "Fix for bug 45266. Needs parallel changes to OAI."
......................................................................


Revert "Fix for bug 45266. Needs parallel changes to OAI."

This reverts commit 24eb3e33ebe867c2a763a44405571ac556426a36

Change-Id: Id2582771cef5520e887551fb3f58424d11e4524c
---
M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
M src/org/wikimedia/lsearch/oai/OAIHarvester.java
2 files changed, 17 insertions(+), 124 deletions(-)

Approvals:
  Demon: Looks good to me, approved
  Ram: Looks good to me, but someone else must approve
  jenkins-bot: Verified



diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java 
b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
index fc3c508..6028fa2 100755
--- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
+++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
@@ -85,14 +85,6 @@
         * @param args
         */
        public static void main(String[] args){
-               // only for debugging: allows use of a different log file from 
the main process
-               for (int i =0; i < args.length; i++) {
-                       if (args[i].equals("-configfile")) {
-                               Configuration.setConfigFile(args[++i]);
-                               break;
-                       }
-               }
-
                // config
                Configuration config = Configuration.open();
                GlobalConfiguration global = GlobalConfiguration.getInstance();
@@ -113,8 +105,7 @@
                boolean requestSnapshot = false;
                String noOptimizationDBlistFile = null;
                HashSet<String> noOptimizationDBs = new HashSet<String>();
-               String seq_first = null;    // first record to fetch
-
+               
                // args
                for(int i=0; i<args.length; i++){
                        if(args[i].equals("-d"))
@@ -139,12 +130,8 @@
                                requestSnapshot = true;
                        else if(args[i].equals("-nof"))
                                noOptimizationDBlistFile = args[++i];
-                       else if(args[i].equals("-q"))
-                               seq_first = args[++i];
                        else if(args[i].equals("--help"))
                                break;
-                       else if(args[i].equals("-configfile"))
-                               ++i;  // skip argument
                        else if(args[i].startsWith("-")){
                                System.out.println("Unrecognized switch 
"+args[i]);
                                return;
@@ -172,7 +159,6 @@
                        System.out.println("  -ef  - exclude db names listed in 
dblist file");
                        System.out.println("  -sn  - immediately make 
unoptimized snapshot as updates finish ");
                        System.out.println("  -nof - use with -sn to specify a 
file with databases not to be optimized");
-                       System.out.println("  -q   - sequence number to start 
from");
                        return;
                }
                // preload
@@ -185,16 +171,13 @@
                
                maxQueueSize = config.getInt("OAI","maxqueue",500);
                bufferDocs = config.getInt("OAI","bufferdocs",50);
-               log.trace( String.format( "maxQueueSize = %d, bufferDocs = 
%d\n", maxQueueSize, bufferDocs ) );
                firstPass.addAll(dbnames);
                // update
                do{
                        main_loop: for(String dbname : dbnames){
                                try{
-                                       if(excludeList.contains(dbname)) {
-                                               log.trace( String.format( "%s 
in excludeList, skipped\n", dbname ) );
+                                       if(excludeList.contains(dbname))
                                                continue;
-                                       }
                                        IndexId iid = IndexId.get(dbname);
                                        OAIHarvester harvester = new 
OAIHarvester(iid,iid.getOAIRepository(),auth);
                                        OAIHarvester harvesterSingle = new 
OAIHarvester(iid,iid.getOAIRepository(),auth);
@@ -210,38 +193,15 @@
                                        } catch (IOException e) {
                                                log.warn("I/O error reading 
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
                                        }                               
-
-                                       // fetch next batch of records based on 
sequence number (new scheme) or
-                                       // timestamp (new scheme)
-                                       //
-                                       String from = null,
-                                              seq_next = null;
-                                       ArrayList<IndexUpdateRecord> records = 
null;
-                                       if ( firstPass.contains( dbname ) ) {
-                                               if ( null != seq_first ) {
-                                                       seq_next = seq_first;
-                                               } else if ( null != timestamp ) 
{
-                                                       from = timestamp;
-                                               }
-                                       }
-                                       if ( null == seq_next && null == from ) 
{
-                                               seq_next = status.getProperty( 
"sequence" );
-                                               if ( null == seq_next ) {
-                                                       from = 
status.getProperty("timestamp",defaultTimestamp);
-                                               }
-                                       }
-                                       if ( null != seq_next ) {  // working 
with sequence numbers
-                                               log.info( "Resuming update of 
"+ iid + ", seq = " + seq_next );
-                                               records = 
harvester.getRecordsSeq( seq_next, bufferDocs );
-                                       } else {              // working with 
timestamps
-                                               log.info( "Resuming update of " 
+ iid + ", from = " + from );
-                                               records = harvester.getRecords( 
from, bufferDocs );
-                                       }
-
-                                       if(records.size() == 0) {
-                                               log.trace( String.format( "No 
records\n" ) );
+                                       String from;
+                                       if(firstPass.contains(dbname) && 
timestamp!=null)
+                                               from = timestamp;
+                                       else
+                                               from = 
status.getProperty("timestamp",defaultTimestamp);
+                                       log.info("Resuming update of "+iid+" 
from "+from);
+                                       ArrayList<IndexUpdateRecord> records = 
harvester.getRecords(from,bufferDocs);
+                                       if(records.size() == 0)
                                                continue;
-                                       }
                                        boolean hasMore = false;
                                        do{
                                                // send to indexer
@@ -274,21 +234,14 @@
                                                if(hasMore){
                                                        log.info("Fetching more 
records...");
                                                        records = 
harvester.getMoreRecords(bufferDocs);
-
-                                                        if(records.size() == 
0) {
-                                                            log.trace( 
String.format( "Unexpected: hasMore is true but no records\n" ) );
-                                                            break;
-                                                        }
                                                }
                                        } while(hasMore);
 
                                        // see if we need to wait for 
notification
-                                       log.trace( String.format( "notification 
= %s\n", notification ) );
                                        if(notification){
                                                RMIMessengerClient messenger = 
new RMIMessengerClient(true);
                                                String host = 
iid.getIndexHost();
                                                boolean req = 
messenger.requestFlushAndNotify(dbname,host);
-                                               log.trace( String.format( "req 
= %s\n", req ) );
                                                if(req){
                                                        log.info("Waiting for 
flush notification for "+dbname);
                                                        Boolean succ = null;
@@ -325,16 +278,14 @@
                                                        continue main_loop;
                                        }
                                        
-                                       // write updated sequence number 
(timestamp not needed)
-                                       status.setProperty( "sequence", 
harvester.getSequence() );
-                                       status.setProperty( "timestamp", 
harvester.getResponseDate() );
+                                       // write updated timestamp
+                                       
status.setProperty("timestamp",harvester.getResponseDate());
                                        try {
                                                if(!statf.exists())
                                                        
statf.getParentFile().mkdirs();
                                                FileOutputStream fileos = new 
FileOutputStream(statf,false);
                                                status.store(fileos,"Last 
incremental update timestamp");
                                                fileos.close();
-                                               log.trace( String.format( 
"stored sequence to status file\n" ) );
                                        } catch (IOException e) {
                                                log.warn("I/O error writing 
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
                                        }
@@ -398,4 +349,4 @@
 
        }
        
-}
+}
\ No newline at end of file
diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java 
b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
index 1796adf..63ac868 100755
--- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
+++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
@@ -1,13 +1,8 @@
 package org.wikimedia.lsearch.oai;
 
 import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
 import java.io.IOException;
-
+import java.io.InputStream;
 import java.net.Authenticator;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -37,30 +32,6 @@
        /** number of retries before giving up, useful when there are broken 
servers in the cluster */
        protected int retries = 5;
        
-       // for debugging
-       // save contents of input stream to memory stream and dump to file
-       private static final boolean DBG = false;
-       private static int fnum = 1;
-       public static InputStream toMem( InputStream is ) throws IOException {
-           ByteArrayOutputStream os = new ByteArrayOutputStream();
-           // open new dump file
-           File dir = new File( "/var/tmp" );
-           String
-               pfx = String.format( "oai_%d_", fnum++ ),
-               sfx = ".xml";
-           File dumpfile = File.createTempFile( pfx, sfx, dir );
-           FileOutputStream fos = new FileOutputStream( dumpfile );
-
-           // read bytes
-           int c;
-           while ( -1 != (c = is.read()) ) {
-               os.write( c ); fos.write( c );
-           }
-           fos.close();
-           ByteArrayInputStream bis = new ByteArrayInputStream( 
os.toByteArray() );
-           return bis;
-       }  // toMem
-
        public OAIHarvester(IndexId iid, String url, Authenticator auth) throws 
MalformedURLException{
                this.urlbase = url;
                this.iid = iid;
@@ -99,19 +70,12 @@
                                // set some timeouts
                                urlConn.setReadTimeout(60 * 1000); // 60 seconds
                                urlConn.setConnectTimeout(60 * 1000); // 60 
seconds
-                               InputStream in;
-                               if ( ! DBG ) {
-                                   in = new BufferedInputStream( 
urlConn.getInputStream() );
-                               } else {
-                                   in = toMem( urlConn.getInputStream() );
-                               }
-
+                               InputStream in = new 
BufferedInputStream(urlConn.getInputStream());
                                parser = new OAIParser(in,collector);
                                parser.parse();
                                resumptionToken = parser.getResumptionToken();
                                responseDate = parser.getResponseDate();
                                in.close();
-                               log.trace( String.format( "resumptionToken = 
%s, responseDate = %s", resumptionToken, responseDate ) );
                                break;
                        } catch(IOException e){                         
                                if(tryNum == this.retries)
@@ -127,8 +91,7 @@
                ArrayList<IndexUpdateRecord> ret = new 
ArrayList<IndexUpdateRecord>();
                try{                    
                        do{
-                               URL url = new URL( urlbase + 
"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + getSequence() );
-                               read( url );
+                               read(new 
URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken));
                                ret.addAll(collector.getRecords());
                        } while(hasMore() && ret.size() < atLeast);
                } catch(IOException e){
@@ -138,29 +101,8 @@
                return ret;
        }
        
-       /** Invoke ListRecords using the last resumption token, get atLeast num 
of records */
-       public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int 
atLeast ) {
-               ArrayList<IndexUpdateRecord> ret = new 
ArrayList<IndexUpdateRecord>();
-               try{                    
-                       do{
-                               read( new URL( urlbase + 
"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + seq ) );
-                               ret.addAll( collector.getRecords() );
-                       } while( hasMore() && ret.size() < atLeast );
-               } catch( IOException e ){
-                       log.warn( "I/O exception listing records: " + 
e.getMessage(), e );
-                       return null;
-               }
-               return ret;
-       }
-       
        public boolean hasMore(){
-               return resumptionToken.endsWith(":");
-       }
-       
-       public String getSequence(){
-               return resumptionToken.endsWith(":")
-                       ? resumptionToken.substring( 0, 
resumptionToken.length() - 1 )
-                       : resumptionToken;
+               return !resumptionToken.equals("");
        }
        
        public String getResponseDate(){

-- 
To view, visit https://gerrit.wikimedia.org/r/53385
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Id2582771cef5520e887551fb3f58424d11e4524c
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/lucene-search-2
Gerrit-Branch: master
Gerrit-Owner: Demon <ch...@wikimedia.org>
Gerrit-Reviewer: Demon <ch...@wikimedia.org>
Gerrit-Reviewer: Ram <r...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to