Update of /cvsroot/nutch/playground/src/java/net/nutch/ipc
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10313/src/java/net/nutch/ipc
Added Files:
package.html Client.java Server.java
Log Message:
intial commit
--- NEW FILE: package.html ---
<html>
<body>
Client/Server code used by distributed search.
</body>
</html>
--- NEW FILE: Client.java ---
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.ipc;
import java.net.Socket;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.io.IOException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.util.Hashtable;
import java.util.logging.Logger;
import java.util.logging.Level;
import net.nutch.util.LogFormatter;
import net.nutch.io.Writable;
import net.nutch.io.UTF8;
/** A client for an IPC service. IPC calls take a single [EMAIL PROTECTED] Writable}
as a
* parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs
on
* a port and is defined by a parameter class and a value class.
*
* @author Doug Cutting
* @see Server
*/
public class Client {
public static final Logger LOG =
LogFormatter.getLogger("net.nutch.ipc.Client");
private Hashtable connections = new Hashtable();
private Class valueClass; // class of call values
private int timeout = 10000; // timeout for calls
private int counter; // counter for call ids
private boolean running = true; // true while client runs
/** A call waiting for a value. */
private class Call {
int id; // call id
Writable param; // parameter
Writable value; // value, null if error
String error; // error, null if value
protected Call(Writable param) {
this.param = param;
synchronized (Client.this) {
this.id = counter++;
}
}
/** Called by the connection thread when the call is complete and the
* value or error string are available. Notifies by default. */
public synchronized void callComplete() {
notify(); // notify caller
}
}
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private InetSocketAddress address; // address of server
private Socket socket; // connected socket
private DataInputStream in;
private DataOutputStream out;
private Hashtable calls = new Hashtable(); // currently active calls
public Connection(InetSocketAddress address) throws IOException {
this.address = address;
this.socket = new Socket(address.getAddress(), address.getPort());
socket.setSoTimeout(timeout);
this.in = new DataInputStream
(new BufferedInputStream(socket.getInputStream()));
this.out = new DataOutputStream
(new BufferedOutputStream(socket.getOutputStream()));
this.setDaemon(true);
this.setName("Client connection to "
+ address.getAddress().getHostAddress()
+ ":" + address.getPort());
}
public void run() {
LOG.info(getName() + ": starting");
try {
while (running) {
int id;
try {
id = in.readInt(); // try to read an id
} catch (SocketTimeoutException e) {
continue;
}
if (LOG.isLoggable(Level.FINE))
LOG.fine(getName() + " got value #" + id);
Call call = (Call)calls.remove(new Integer(id));
boolean isError = in.readBoolean(); // read if error
if (isError) {
UTF8 utf8 = new UTF8();
utf8.readFields(in); // read error string
call.error = utf8.toString();
call.value = null;
} else {
Writable value = makeValue();
value.readFields(in); // read value
call.value = value;
call.error = null;
}
call.callComplete(); // deliver result to caller
}
} catch (Exception e) {
LOG.log(Level.INFO, getName() + " caught: " + e, e);
} finally {
close();
}
}
/** Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
*/
public void sendParam(Call call) throws IOException {
boolean error = true;
try {
calls.put(new Integer(call.id), call);
synchronized (out) {
if (LOG.isLoggable(Level.FINE))
LOG.fine(getName() + " sending #" + call.id);
out.writeInt(call.id);
call.param.write(out);
out.flush();
}
error = false;
} finally {
if (error)
close(); // close on error
}
}
/** Close the connection and remove it from the pool. */
public void close() {
LOG.info(getName() + ": closing");
connections.remove(address); // remove connection
try {
socket.close(); // close socket
} catch (IOException e) {}
}
}
/** Call implementation used for parallel calls. */
private class ParallelCall extends Call {
private ParallelResults results;
private int index;
public ParallelCall(Writable param, ParallelResults results, int index) {
super(param);
this.results = results;
this.index = index;
}
/** Deliver result to result collector. */
public void callComplete() {
results.callComplete(this);
}
}
/** Result collector for parallel calls. */
private static class ParallelResults {
private Writable[] values;
private int size;
private int count;
public ParallelResults(int size) {
this.values = new Writable[size];
this.size = size;
}
/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
values[call.index] = call.value; // store the value
count++; // count it
if (count == size) // if all values are in
notify(); // then notify waiting caller
}
}
/** Construct an IPC client whose values are of the given [EMAIL PROTECTED] Writable}
* class. */
public Client(Class valueClass) {
this.valueClass = valueClass;
}
/** Stop all threads related to this client. No further calls may be made
* using this client. */
public void stop() {
LOG.info("Stopping client");
try {
Thread.sleep(timeout); // let all calls complete
} catch (InterruptedException e) {}
running = false;
}
/** Sets the timeout used for network i/o. */
public void setTimeout(int timeout) { this.timeout = timeout; }
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception. */
public Writable call(Writable param, InetSocketAddress address)
throws IOException {
Connection connection = getConnection(address);
Call call = new Call(param);
synchronized (call) {
connection.sendParam(call); // send the parameter
try {
call.wait(timeout); // wait for the result
} catch (InterruptedException e) {}
if (call.error != null) {
throw new IOException(call.error);
} else if (call.value == null) {
throw new IOException("timed out waiting for response");
} else {
return call.value;
}
}
}
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
throws IOException {
if (params.length == 0) return new Writable[0];
ParallelResults results = new ParallelResults(params.length);
synchronized (results) {
for (int i = 0; i < params.length; i++) {
ParallelCall call = new ParallelCall(params[i], results, i);
try {
Connection connection = getConnection(addresses[i]);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors
results.size--; // wait for one fewer result
}
}
try {
results.wait(timeout); // wait for all results
} catch (InterruptedException e) {}
if (results.count == 0) {
throw new IOException("no responses");
} else {
return results.values;
}
}
}
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
private Connection getConnection(InetSocketAddress address)
throws IOException {
Connection connection;
synchronized (connections) {
connection = (Connection)connections.get(address);
if (connection == null) {
connection = new Connection(address);
connections.put(address, connection);
connection.start();
}
}
return connection;
}
private Writable makeValue() {
Writable value; // construct value
try {
value = (Writable)valueClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(e.toString());
} catch (IllegalAccessException e) {
throw new RuntimeException(e.toString());
}
return value;
}
}
--- NEW FILE: Server.java ---
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.ipc;
import java.io.IOException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.LinkedList;
import java.util.logging.Logger;
import java.util.logging.Level;
import net.nutch.util.LogFormatter;
import net.nutch.io.Writable;
import net.nutch.io.UTF8;
/** An abstract IPC service. IPC calls take a single [EMAIL PROTECTED] Writable} as a
* parameter, and return a [EMAIL PROTECTED] Writable} as their value. A service runs
on
* a port and is defined by a parameter class and a value class.
*
* @author Doug Cutting
* @see Client
*/
public abstract class Server {
public static final Logger LOG =
LogFormatter.getLogger("net.nutch.ipc.Server");
private int port; // port we listen on
private int handlerCount; // number of handler threads
private int maxQueuedCalls; // max number of queued calls
private Class paramClass; // class of call parameters
private int timeout = 10000; // timeout for i/o
private boolean running = true; // true while server runs
private LinkedList callQueue = new LinkedList(); // queued calls
private Object callDequeued = new Object(); // used by wait/notify
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
private Writable param; // the parameter passed
private Connection connection; // connection to client
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
}
}
/** Listens on the socket, starting new connection threads. */
private class Listener extends Thread {
private ServerSocket socket;
public Listener() throws IOException {
this.socket = new ServerSocket(port);
socket.setSoTimeout(timeout);
this.setDaemon(true);
this.setName("Server listener on port " + port);
}
public void run() {
LOG.info(getName() + ": starting");
while (running) {
try {
new Connection(socket.accept()).start(); // start a new connection
} catch (SocketTimeoutException e) { // ignore timeouts
} catch (Exception e) { // log all other exceptions
LOG.log(Level.INFO, getName() + " caught: " + e, e);
}
}
try {
socket.close();
} catch (IOException e) {}
LOG.info(getName() + ": exiting");
}
}
/** Reads calls from a connection and queues them for handling. */
private class Connection extends Thread {
private Socket socket;
private DataInputStream in;
private DataOutputStream out;
public Connection(Socket socket) throws IOException {
this.socket = socket;
socket.setSoTimeout(timeout);
this.in = new DataInputStream
(new BufferedInputStream(socket.getInputStream()));
this.out = new DataOutputStream
(new BufferedOutputStream(socket.getOutputStream()));
this.setDaemon(true);
this.setName("Server connection on port " + port + " from "
+ socket.getInetAddress().getHostAddress());
}
public void run() {
LOG.info(getName() + ": starting");
try {
while (running) {
int id;
try {
id = in.readInt(); // try to read an id
} catch (SocketTimeoutException e) {
continue;
}
if (LOG.isLoggable(Level.FINE))
LOG.fine(getName() + " got #" + id);
Writable param = makeParam(); // read param
param.readFields(in);
Call call = new Call(id, param, this);
synchronized (callQueue) {
callQueue.addLast(call); // queue the call
callQueue.notify(); // wake up a waiting handler
}
while (running && callQueue.size() >= maxQueuedCalls) {
synchronized (callDequeued) { // queue is full
callDequeued.wait(timeout); // wait for a dequeue
}
}
}
} catch (Exception e) {
LOG.log(Level.INFO, getName() + " caught: " + e, e);
} finally {
try {
socket.close();
} catch (IOException e) {}
LOG.info(getName() + ": exiting");
}
}
}
/** Handles queued calls . */
private class Handler extends Thread {
public Handler() {
this.setDaemon(true);
this.setName("Server handler on " + port);
}
public void run() {
LOG.info(getName() + ": starting");
while (running) {
try {
Call call;
synchronized (callQueue) {
while (running && callQueue.size()==0) { // wait for a call
callQueue.wait(timeout);
}
if (!running) break;
call = (Call)callQueue.removeFirst(); // pop the queue
}
synchronized (callDequeued) { // tell others we've dequeued
callDequeued.notify();
}
if (LOG.isLoggable(Level.FINE))
LOG.fine(getName() + ": has #" + call.id + " from " +
call.connection.socket.getInetAddress().getHostAddress());
String error = null;
Writable value = null;
try {
value = call(call.param); // make the call
} catch (Exception e) {
LOG.log(Level.INFO, getName() + " call error: " + e, e);
error = e.toString();
}
DataOutputStream out = call.connection.out;
synchronized (out) {
out.writeInt(call.id); // write call id
out.writeBoolean(error!=null); // write error flag
if (error != null)
value = new UTF8(error);
value.write(out); // write value
out.flush();
}
} catch (Exception e) {
LOG.log(Level.INFO, getName() + " caught: " + e, e);
}
}
LOG.info(getName() + ": exiting");
}
}
/** Constructs a server listening on the named port. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
*/
protected Server(int port, Class paramClass, int handlerCount) {
this.port = port;
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.maxQueuedCalls = handlerCount;
}
/** Sets the timeout used for network i/o. */
public void setTimeout(int timeout) { this.timeout = timeout; }
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() throws IOException {
Listener listener = new Listener();
listener.start();
for (int i = 0; i < handlerCount; i++) {
Handler handler = new Handler();
handler.start();
}
}
/** Stops the service. No calls will be handled after this is called. All
* threads will exit. */
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
try {
Thread.sleep(timeout); // let all threads exit
} catch (InterruptedException e) {}
notify();
}
/** Wait for the server to be stopped. */
public synchronized void join() throws InterruptedException {
wait();
}
/** Called for each call. */
public abstract Writable call(Writable param) throws IOException;
private Writable makeParam() {
Writable param; // construct param
try {
param = (Writable)paramClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(e.toString());
} catch (IllegalAccessException e) {
throw new RuntimeException(e.toString());
}
return param;
}
}
-------------------------------------------------------
The SF.Net email is sponsored by EclipseCon 2004
Premiere Conference on Open Tools Development and Integration
See the breadth of Eclipse activity. February 3-5 in Anaheim, CA.
http://www.eclipsecon.org/osdn
_______________________________________________
Nutch-cvs mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs