Update of /cvsroot/nutch/playground/src/java/net/nutch/ipc
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10313/src/java/net/nutch/ipc

Added Files:
        package.html Client.java Server.java 
Log Message:
intial commit

--- NEW FILE: package.html ---
<html>
<body>
Client/Server code used by distributed search.
</body>
</html>

--- NEW FILE: Client.java ---
/* Copyright (c) 2003 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.ipc;

import java.net.Socket;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;

import java.io.IOException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;

import java.util.Hashtable;
import java.util.logging.Logger;
import java.util.logging.Level;

import net.nutch.util.LogFormatter;
import net.nutch.io.Writable;
import net.nutch.io.UTF8;

/** A client for an IPC service.  IPC calls take a single [EMAIL PROTECTED] Writable} 
as a
 * parameter, and return a [EMAIL PROTECTED] Writable} as their value.  A service runs 
on
 * a port and is defined by a parameter class and a value class.
 * 
 * @author Doug Cutting
 * @see Server
 */
public class Client {
  public static final Logger LOG =
    LogFormatter.getLogger("net.nutch.ipc.Client");

  private Hashtable connections = new Hashtable();

  private Class valueClass;                       // class of call values
  private int timeout = 10000;                    // timeout for calls
  private int counter;                            // counter for call ids
  private boolean running = true;                 // true while client runs

  /** A call waiting for a value. */
  private class Call {
    int id;                                       // call id
    Writable param;                               // parameter
    Writable value;                               // value, null if error
    String error;                                 // error, null if value

    protected Call(Writable param) {
      this.param = param;
      synchronized (Client.this) {
        this.id = counter++;
      }
    }

    /** Called by the connection thread when the call is complete and the
     * value or error string are available.  Notifies by default.  */
    public synchronized void callComplete() {
        notify();                                 // notify caller
    }
  }

  /** Thread that reads responses and notifies callers.  Each connection owns a
   * socket connected to a remote address.  Calls are multiplexed through this
   * socket: responses may be delivered out of order. */
  private class Connection extends Thread {
    private InetSocketAddress address;            // address of server
    private Socket socket;                        // connected socket
    private DataInputStream in;                   
    private DataOutputStream out;
    private Hashtable calls = new Hashtable();    // currently active calls

    public Connection(InetSocketAddress address) throws IOException {
      this.address = address;
      this.socket = new Socket(address.getAddress(), address.getPort());
      socket.setSoTimeout(timeout);
      this.in = new DataInputStream
        (new BufferedInputStream(socket.getInputStream()));
      this.out = new DataOutputStream
        (new BufferedOutputStream(socket.getOutputStream()));
      this.setDaemon(true);
      this.setName("Client connection to "
                   + address.getAddress().getHostAddress()
                   + ":" + address.getPort());
    }

    public void run() {
      LOG.info(getName() + ": starting");
      try {
        while (running) {
          int id;
          try {
            id = in.readInt();                    // try to read an id
          } catch (SocketTimeoutException e) {
            continue;
          }

          if (LOG.isLoggable(Level.FINE))
            LOG.fine(getName() + " got value #" + id);

          Call call = (Call)calls.remove(new Integer(id));
          boolean isError = in.readBoolean();     // read if error
          if (isError) {
            UTF8 utf8 = new UTF8();
            utf8.readFields(in);                  // read error string
            call.error = utf8.toString();
            call.value = null;
          } else {
            Writable value = makeValue();
            value.readFields(in);                 // read value
            call.value = value;
            call.error = null;
          }
          call.callComplete();                   // deliver result to caller
        }
      } catch (Exception e) {
        LOG.log(Level.INFO, getName() + " caught: " + e, e);
      } finally {
        close();
      }
    }

    /** Initiates a call by sending the parameter to the remote server.
     * Note: this is not called from the Connection thread, but by other
     * threads.
     */
    public void sendParam(Call call) throws IOException {
      boolean error = true;
      try {
        calls.put(new Integer(call.id), call);
        synchronized (out) {
          if (LOG.isLoggable(Level.FINE))
            LOG.fine(getName() + " sending #" + call.id);
          out.writeInt(call.id);
          call.param.write(out);
          out.flush();
        }
        error = false;
      } finally {
        if (error)
          close();                                // close on error
      }
    }

    /** Close the connection and remove it from the pool. */
    public void close() {
      LOG.info(getName() + ": closing");
      connections.remove(address);                // remove connection
      try {
        socket.close();                           // close socket
      } catch (IOException e) {}
    }

  }

  /** Call implementation used for parallel calls. */
  private class ParallelCall extends Call {
    private ParallelResults results;
    private int index;
    
    public ParallelCall(Writable param, ParallelResults results, int index) {
      super(param);
      this.results = results;
      this.index = index;
    }

    /** Deliver result to result collector. */
    public void callComplete() {
      results.callComplete(this);
    }
  }

  /** Result collector for parallel calls. */
  private static class ParallelResults {
    private Writable[] values;
    private int size;
    private int count;

    public ParallelResults(int size) {
      this.values = new Writable[size];
      this.size = size;
    }

    /** Collect a result. */
    public synchronized void callComplete(ParallelCall call) {
      values[call.index] = call.value;            // store the value
      count++;                                    // count it
      if (count == size)                          // if all values are in
        notify();                                 // then notify waiting caller
    }
  }

  /** Construct an IPC client whose values are of the given [EMAIL PROTECTED] Writable}
   * class. */
  public Client(Class valueClass) {
    this.valueClass = valueClass;
  }

  /** Stop all threads related to this client.  No further calls may be made
   * using this client. */
  public void stop() {
    LOG.info("Stopping client");
    try {
      Thread.sleep(timeout);                        // let all calls complete
    } catch (InterruptedException e) {}
    running = false;
  }

  /** Sets the timeout used for network i/o. */
  public void setTimeout(int timeout) { this.timeout = timeout; }

  /** Make a call, passing <code>param</code>, to the IPC server running at
   * <code>address</code>, returning the value.  Throws exceptions if there are
   * network problems or if the remote code threw an exception. */
  public Writable call(Writable param, InetSocketAddress address)
    throws IOException {
    Connection connection = getConnection(address);
    Call call = new Call(param);
    synchronized (call) {
      connection.sendParam(call);                 // send the parameter
      try {
        call.wait(timeout);                       // wait for the result
      } catch (InterruptedException e) {}

      if (call.error != null) {
        throw new IOException(call.error);
      } else if (call.value == null) {
        throw new IOException("timed out waiting for response");
      } else {
        return call.value;
      }
    }
  }

  /** Makes a set of calls in parallel.  Each parameter is sent to the
   * corresponding address.  When all values are available, or have timed out
   * or errored, the collected results are returned in an array.  The array
   * contains nulls for calls that timed out or errored.  */
  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
    throws IOException {
    if (params.length == 0) return new Writable[0];

    ParallelResults results = new ParallelResults(params.length);
    synchronized (results) {
      for (int i = 0; i < params.length; i++) {
        ParallelCall call = new ParallelCall(params[i], results, i);
        try {
          Connection connection = getConnection(addresses[i]);
          connection.sendParam(call);             // send each parameter
        } catch (IOException e) {
          LOG.info("Calling "+addresses[i]+" caught: " + e); // log errors
          results.size--;                         //  wait for one fewer result
        }
      }
      try {
        results.wait(timeout);                    // wait for all results
      } catch (InterruptedException e) {}

      if (results.count == 0) {
        throw new IOException("no responses");
      } else {
        return results.values;
      }
    }
  }

  /** Get a connection from the pool, or create a new one and add it to the
   * pool.  Connections to a given host/port are reused. */
  private Connection getConnection(InetSocketAddress address)
    throws IOException {
    Connection connection;
    synchronized (connections) {
      connection = (Connection)connections.get(address);
      if (connection == null) {
        connection = new Connection(address);
        connections.put(address, connection);
        connection.start();
      }
    }
    return connection;
  }

  private Writable makeValue() {
    Writable value;                             // construct value
    try {
      value = (Writable)valueClass.newInstance();
    } catch (InstantiationException e) {
      throw new RuntimeException(e.toString());
    } catch (IllegalAccessException e) {
      throw new RuntimeException(e.toString());
    }
    return value;
  }

}

--- NEW FILE: Server.java ---
/* Copyright (c) 2003 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.ipc;

import java.io.IOException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;

import java.net.Socket;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;

import java.util.LinkedList;
import java.util.logging.Logger;
import java.util.logging.Level;

import net.nutch.util.LogFormatter;
import net.nutch.io.Writable;
import net.nutch.io.UTF8;

/** An abstract IPC service.  IPC calls take a single [EMAIL PROTECTED] Writable} as a
 * parameter, and return a [EMAIL PROTECTED] Writable} as their value.  A service runs 
on
 * a port and is defined by a parameter class and a value class.
 * 
 * @author Doug Cutting
 * @see Client
 */
public abstract class Server {
  public static final Logger LOG =
    LogFormatter.getLogger("net.nutch.ipc.Server");

  private int port;                               // port we listen on
  private int handlerCount;                       // number of handler threads
  private int maxQueuedCalls;                     // max number of queued calls
  private Class paramClass;                       // class of call parameters

  private int timeout = 10000;                    // timeout for i/o
  private boolean running = true;                 // true while server runs
  private LinkedList callQueue = new LinkedList(); // queued calls
  private Object callDequeued = new Object();     // used by wait/notify

  /** A call queued for handling. */
  private static class Call {
    private int id;                               // the client's call id
    private Writable param;                       // the parameter passed
    private Connection connection;                // connection to client

    public Call(int id, Writable param, Connection connection) {
      this.id = id;
      this.param = param;
      this.connection = connection;
    }
  }

  /** Listens on the socket, starting new connection threads. */
  private class Listener extends Thread {
    private ServerSocket socket;

    public Listener() throws IOException {
      this.socket = new ServerSocket(port);
      socket.setSoTimeout(timeout);
      this.setDaemon(true);
      this.setName("Server listener on port " + port);
    }

    public void run() {
      LOG.info(getName() + ": starting");
      while (running) {
        try {
          new Connection(socket.accept()).start(); // start a new connection
        } catch (SocketTimeoutException e) {      // ignore timeouts
        } catch (Exception e) {                   // log all other exceptions
          LOG.log(Level.INFO, getName() + " caught: " + e, e);
        }
      }
      try {
        socket.close();
      } catch (IOException e) {}
      LOG.info(getName() + ": exiting");
    }
  }

  /** Reads calls from a connection and queues them for handling. */
  private class Connection extends Thread {
    private Socket socket;
    private DataInputStream in;
    private DataOutputStream out;

    public Connection(Socket socket) throws IOException {
      this.socket = socket;
      socket.setSoTimeout(timeout);
      this.in = new DataInputStream
        (new BufferedInputStream(socket.getInputStream()));
      this.out = new DataOutputStream
        (new BufferedOutputStream(socket.getOutputStream()));
      this.setDaemon(true);
      this.setName("Server connection on port " + port + " from "
                   + socket.getInetAddress().getHostAddress());
    }

    public void run() {
      LOG.info(getName() + ": starting");
      try {
        while (running) {
          int id;
          try {
            id = in.readInt();                    // try to read an id
          } catch (SocketTimeoutException e) {
            continue;
          }
        
          if (LOG.isLoggable(Level.FINE))
            LOG.fine(getName() + " got #" + id);
        
          Writable param = makeParam();           // read param
          param.readFields(in);        
        
          Call call = new Call(id, param, this);
        
          synchronized (callQueue) {
            callQueue.addLast(call);              // queue the call
            callQueue.notify();                   // wake up a waiting handler
          }
        
          while (running && callQueue.size() >= maxQueuedCalls) {
            synchronized (callDequeued) {         // queue is full
              callDequeued.wait(timeout);         // wait for a dequeue
            }
          }
        }
      } catch (Exception e) {
        LOG.log(Level.INFO, getName() + " caught: " + e, e);
      } finally {
        try {
          socket.close();
        } catch (IOException e) {}
        LOG.info(getName() + ": exiting");
      }
    }

  }

  /** Handles queued calls . */
  private class Handler extends Thread {
    public Handler() {
      this.setDaemon(true);
      this.setName("Server handler on " + port);
    }

    public void run() {
      LOG.info(getName() + ": starting");
      while (running) {
        try {
          Call call;
          synchronized (callQueue) {
            while (running && callQueue.size()==0) { // wait for a call
              callQueue.wait(timeout);
            }
            if (!running) break;
            call = (Call)callQueue.removeFirst(); // pop the queue
          }

          synchronized (callDequeued) {           // tell others we've dequeued
            callDequeued.notify();
          }

          if (LOG.isLoggable(Level.FINE))
            LOG.fine(getName() + ": has #" + call.id + " from " +
                     call.connection.socket.getInetAddress().getHostAddress());
          
          String error = null;
          Writable value = null;
          try {
            value = call(call.param);             // make the call
          } catch (Exception e) {
            LOG.log(Level.INFO, getName() + " call error: " + e, e);
            error = e.toString();
          }
            
          DataOutputStream out = call.connection.out;
          synchronized (out) {
            out.writeInt(call.id);                // write call id
            out.writeBoolean(error!=null);        // write error flag
            if (error != null)
              value = new UTF8(error);
            value.write(out);                     // write value
            out.flush();
          }

        } catch (Exception e) {
          LOG.log(Level.INFO, getName() + " caught: " + e, e);
        }
      }
      LOG.info(getName() + ": exiting");
    }
  }
  
  /** Constructs a server listening on the named port.  Parameters passed must
   * be of the named class.  The <code>handlerCount</handlerCount> determines
   * the number of handler threads that will be used to process calls.
   */
  protected Server(int port, Class paramClass, int handlerCount) {
    this.port = port;
    this.paramClass = paramClass;
    this.handlerCount = handlerCount;
    this.maxQueuedCalls = handlerCount;
  }

  /** Sets the timeout used for network i/o. */
  public void setTimeout(int timeout) { this.timeout = timeout; }

  /** Starts the service.  Must be called before any calls will be handled. */
  public synchronized void start() throws IOException {
    Listener listener = new Listener();
    listener.start();
    
    for (int i = 0; i < handlerCount; i++) {
      Handler handler = new Handler();
      handler.start();
    }
  }

  /** Stops the service.  No calls will be handled after this is called.  All
   * threads will exit. */
  public synchronized void stop() {
    LOG.info("Stopping server on " + port);
    running = false;
    try {
      Thread.sleep(timeout);                        // let all threads exit
    } catch (InterruptedException e) {}
    notify();
  }

  /** Wait for the server to be stopped. */
  public synchronized void join() throws InterruptedException {
    wait();
  }

  /** Called for each call. */
  public abstract Writable call(Writable param) throws IOException;

  
  private Writable makeParam() {
    Writable param;                               // construct param
    try {
      param = (Writable)paramClass.newInstance();
    } catch (InstantiationException e) {
      throw new RuntimeException(e.toString());
    } catch (IllegalAccessException e) {
      throw new RuntimeException(e.toString());
    }
    return param;
  }

}



-------------------------------------------------------
The SF.Net email is sponsored by EclipseCon 2004
Premiere Conference on Open Tools Development and Integration
See the breadth of Eclipse activity. February 3-5 in Anaheim, CA.
http://www.eclipsecon.org/osdn
_______________________________________________
Nutch-cvs mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/nutch-cvs

Reply via email to