Author: siren
Date: Tue Jun 27 12:34:20 2006
New Revision: 417567
URL: http://svn.apache.org/viewvc?rev=417567&view=rev
Log:
NUTCH-306 fix for concurrency problem contributed by Grant Glouser
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
URL:
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=417567&r1=417566&r2=417567&view=diff
==============================================================================
---
lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
(original)
+++
lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
Tue Jun 27 12:34:20 2006
@@ -82,7 +82,7 @@
Runnable {
private InetSocketAddress[] defaultAddresses;
- private InetSocketAddress[] liveAddresses;
+ private boolean[] liveServer;
private HashMap segmentToAddress = new HashMap();
private boolean running = true;
@@ -128,6 +128,7 @@
public Client(InetSocketAddress[] addresses, Configuration conf) throws
IOException {
this.conf = conf;
this.defaultAddresses = addresses;
+ this.liveServer = new boolean[addresses.length];
updateSegments();
setDaemon(true);
start();
@@ -162,7 +163,9 @@
int liveServers=0;
int liveSegments=0;
- Vector liveAddresses=new Vector();
+
+ // Create new array of flags so they can all be updated at once.
+ boolean[] updatedLiveServer = new boolean[defaultAddresses.length];
// build segmentToAddress map
Object[][] params = new Object[defaultAddresses.length][0];
@@ -173,6 +176,7 @@
InetSocketAddress addr = defaultAddresses[i];
String[] segments = results[i];
if (segments == null) {
+ updatedLiveServer[i] = false;
if (LOG.isWarnEnabled()) {
LOG.warn("Client: no segments from: " + addr);
}
@@ -184,13 +188,13 @@
}
segmentToAddress.put(segments[j], addr);
}
- liveAddresses.add(addr);
+ updatedLiveServer[i] = true;
liveServers++;
liveSegments+=segments.length;
}
- this.liveAddresses = (InetSocketAddress[]) // update liveAddresses
- liveAddresses.toArray(new InetSocketAddress[liveAddresses.size()]);
+ // Now update live server flags.
+ this.liveServer = updatedLiveServer;
if (LOG.isInfoEnabled()) {
LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments.");
@@ -206,7 +210,26 @@
public Hits search(final Query query, final int numHits,
final String dedupField, final String sortField,
final boolean reverse) throws IOException {
- long totalHits = 0;
+ // Get the list of live servers. It would be nice to build this
+ // list in updateSegments(), but that would create concurrency issues.
+ // We grab a local reference to the live server flags in case it
+ // is updated while we are building our list of liveAddresses.
+ boolean[] savedLiveServer = this.liveServer;
+ int numLive = 0;
+ for (int i = 0; i < savedLiveServer.length; i++) {
+ if (savedLiveServer[i])
+ numLive++;
+ }
+ InetSocketAddress[] liveAddresses = new InetSocketAddress[numLive];
+ int[] liveIndexNos = new int[numLive];
+ int k = 0;
+ for (int i = 0; i < savedLiveServer.length; i++) {
+ if (savedLiveServer[i]) {
+ liveAddresses[k] = defaultAddresses[i];
+ liveIndexNos[k] = i;
+ k++;
+ }
+ }
Object[][] params = new Object[liveAddresses.length][5];
for (int i = 0; i < params.length; i++) {
@@ -230,6 +253,7 @@
queue = new TreeSet();
}
+ long totalHits = 0;
Comparable maxValue = null;
for (int i = 0; i < results.length; i++) {
Hits hits = results[i];
@@ -241,7 +265,7 @@
((reverse || sortField == null)
? h.getSortValue().compareTo(maxValue) >= 0
: h.getSortValue().compareTo(maxValue) <= 0)) {
- queue.add(new Hit(i, h.getIndexDocNo(),
+ queue.add(new Hit(liveIndexNos[i], h.getIndexDocNo(),
h.getSortValue(), h.getDedupValue()));
if (queue.size() > numHits) { // if hit queue overfull
queue.remove(queue.last()); // remove lowest in hit queue
@@ -255,7 +279,7 @@
private Protocol getRemote(Hit hit) {
return (Protocol)
- RPC.getProxy(Protocol.class, liveAddresses[hit.getIndexNo()], conf);
+ RPC.getProxy(Protocol.class, defaultAddresses[hit.getIndexNo()], conf);
}
private Protocol getRemote(HitDetails hit) {
@@ -276,7 +300,7 @@
InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
Object[][] params = new Object[hits.length][1];
for (int i = 0; i < hits.length; i++) {
- addrs[i] = liveAddresses[hits[i].getIndexNo()];
+ addrs[i] = defaultAddresses[hits[i].getIndexNo()];
params[i][0] = hits[i];
}
return (HitDetails[])RPC.call(DETAILS, params, addrs, conf);
@@ -368,7 +392,7 @@
updateSegments();
} catch (IOException ioe) {
if (LOG.isWarnEnabled()) { LOG.warn("No search servers available!");
}
- liveAddresses=new InetSocketAddress[0];
+ liveServer = new boolean[defaultAddresses.length];
}
}
}
Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
Nutch-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs