> Why?
>
It can be really usefull when you have your node and Thaw on two different 
computers, themselves on two differrent internet access (it's my situation, 
and the nextgens one). When you're in this situation and you're sending a 
very large file to your node (for insertion), it allows, for example, to 
continue to work by ssh on the machine running the node without too much 
latency.


> On Sun, Jul 16, 2006 at 03:49:12PM +0000, jflesch at freenetproject.org wrote:
> > Author: jflesch
> > Date: 2006-07-16 15:49:09 +0000 (Sun, 16 Jul 2006)
> > New Revision: 9630
> >
> > Added:
> >    trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
> > Modified:
> >    trunk/apps/Thaw/src/thaw/core/Config.java
> >    trunk/apps/Thaw/src/thaw/core/Core.java
> >    trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
> >    trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
> >    trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
> >    trunk/apps/Thaw/src/thaw/i18n/thaw.properties
> >    trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
> >    trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
> >    trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> > Log:
> > Traffic shapper (for socket output only) is now implemented + Various bug
> > fixes
> >
> > Modified: trunk/apps/Thaw/src/thaw/core/Config.java
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/core/Config.java       2006-07-15 21:40:40 UTC
> > (rev 9629) +++ trunk/apps/Thaw/src/thaw/core/Config.java    2006-07-16
> > 15:49:09 UTC (rev 9630) @@ -297,6 +297,7 @@
> >             setValue("nodePort", "9481");
> >             setValue("maxSimultaneousDownloads", "-1");
> >             setValue("maxSimultaneousInsertions", "-1");
> > +           setValue("maxUploadSpeed", "-1");
> >             setValue("thawId", "thaw_"+(new Integer((new
> > Random()).nextInt(1000))).toString()); }
> >
> >
> > Modified: trunk/apps/Thaw/src/thaw/core/Core.java
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/core/Core.java 2006-07-15 21:40:40 UTC (rev
> > 9629) +++ trunk/apps/Thaw/src/thaw/core/Core.java   2006-07-16 15:49:09 UTC
> > (rev 9630) @@ -137,7 +137,8 @@
> >                     }
> >
> >                     connection = new 
> > FCPConnection(config.getValue("nodeAddress"),
> > -                                                  (new 
> > Integer(config.getValue("nodePort"))).intValue());
> > +                                                  (new 
> > Integer(config.getValue("nodePort"))).intValue(),
> > +                                                  (new
> > Integer(config.getValue("maxUploadSpeed"))).intValue());
> >
> >                     if(!connection.connect()) {
> >                             new WarningWindow(this, "Unable to connect to
> > "+config.getValue("nodeAddress")+":"+
> >
> > Modified: trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java      2006-07-15
> > 21:40:40 UTC (rev 9629) +++
> > trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java  2006-07-16 15:49:09
> > UTC (rev 9630) @@ -29,6 +29,7 @@
> >             I18n.getMessage("thaw.config.nodePort"),
> >             I18n.getMessage("thaw.config.maxSimultaneousDownloads"),
> >             I18n.getMessage("thaw.config.maxSimultaneousInsertions"),
> > +           I18n.getMessage("thaw.config.maxUploadSpeed"),
> >             I18n.getMessage("thaw.config.thawId")
> >     };
> >
> > @@ -37,6 +38,7 @@
> >             "nodePort",
> >             "maxSimultaneousDownloads",
> >             "maxSimultaneousInsertions",
> > +           "maxUploadSpeed",
> >             "thawId"
> >     };
> >
> >
> > Added: trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java     2006-07-15
> > 21:40:40 UTC (rev 9629) +++
> > trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java 2006-07-16 15:49:09
> > UTC (rev 9630) @@ -0,0 +1,166 @@
> > +package thaw.fcp;
> > +
> > +import java.net.Socket;
> > +import java.io.InputStream;
> > +import java.io.OutputStream;
> > +import java.io.BufferedInputStream;
> > +import java.io.InputStreamReader;
> > +
> > +import thaw.core.Logger;
> > +
> > +/**
> > + * Only used by FCPConnection. Except special situation, you shouldn't
> > have to use it directly. + * Currently only used for output. (shouldn't
> > be really usefull for input). + * Some data are sent each 'INTERVAL' (in
> > ms).
> > + */
> > +public class FCPBufferedStream implements Runnable {
> > +   private FCPConnection connection;
> > +   private int maxUploadSpeed;
> > +
> > +   private byte outputBuffer[];
> > +
> > +   public final static int OUTPUT_BUFFER_SIZE = 102400;
> > +   public final static int INTERVAL = 200;
> > +
> > +   private int waiting = 0; /* amount of data stored in the buffer */
> > +   private int readCursor = 0; /* indicates where the nex read will be */
> > +   private int writeCursor = 0; /* indicates where the next write will be
> > */ +
> > +   private Thread tractopelle = null;
> > +   private boolean running = true;
> > +   private int packetSize = 0;
> > +
> > +
> > +
> > +   public FCPBufferedStream(FCPConnection connection,
> > +                            int maxUploadSpeed) {
> > +           this.connection = connection;
> > +           this.maxUploadSpeed = maxUploadSpeed;
> > +
> > +           if(maxUploadSpeed >= 0) {
> > +                   outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
> > +                   packetSize = (maxUploadSpeed * 1024) / (1000/INTERVAL);
> > +           }
> > +   }
> > +
> > +   /**
> > +    * Add to the buffer. Can block if buffer is full !
> > +    * Never send more than OUTPUT_BUFFER_SIZE.
> > +    */
> > +   public synchronized boolean write(byte[] data) {
> > +           if(maxUploadSpeed == -1) {
> > +                   return connection.realRawWrite(data);
> > +           }
> > +
> > +           while(waiting + data.length > OUTPUT_BUFFER_SIZE) {
> > +                   sleep(INTERVAL);
> > +           }
> > +
> > +           waiting += data.length;
> > +
> > +           for(int i = 0 ; i < data.length ; i++) {
> > +                   outputBuffer[writeCursor] = data[i];
> > +
> > +                   writeCursor++;
> > +
> > +                   if(writeCursor >= OUTPUT_BUFFER_SIZE)
> > +                           writeCursor = 0;
> > +           }
> > +
> > +           return true;
> > +   }
> > +
> > +   /**
> > +    * @see write(byte[])
> > +    */
> > +   public boolean write(String data) {
> > +           return write(data.getBytes());
> > +   }
> > +
> > +   /**
> > +    * extract from the buffer
> > +    */
> > +   private boolean readOutputBuffer(byte[] data) {
> > +           for(int i = 0; i < data.length ; i++) {
> > +                   data[i] = outputBuffer[readCursor];
> > +
> > +                   readCursor++;
> > +
> > +                   if(readCursor >= OUTPUT_BUFFER_SIZE)
> > +                           readCursor = 0;
> > +           }
> > +
> > +           waiting -= data.length;
> > +
> > +           return true;
> > +   }
> > +
> > +   /**
> > +    * wait for the buffer being empty.
> > +    */
> > +   public void flush() {
> > +           while(waiting > 0) {
> > +                   sleep(INTERVAL);
> > +           }
> > +   }
> > +
> > +
> > +   public void run() {
> > +           byte[] data;
> > +
> > +           while(running) { /* Wild and freeeeeee */
> > +                   if(waiting > 0) {
> > +                           int to_read = packetSize;
> > +
> > +                           if(waiting < to_read)
> > +                                   to_read = waiting;
> > +
> > +                           data = new byte[to_read];
> > +
> > +                           readOutputBuffer(data);
> > +
> > +                           connection.realRawWrite(data);
> > +                   }
> > +
> > +                   sleep(INTERVAL);
> > +           }
> > +   }
> > +
> > +   /**
> > +    * Start the thread sending data from the buffer to the OutputStream
> > (socket). +  */
> > +   public boolean startSender() {
> > +           running = true;
> > +
> > +           if(maxUploadSpeed < 0) {
> > +                   Logger.notice(this, "startSender(): No upload limit. 
> > Not needed");
> > +                   return false;
> > +           }
> > +
> > +           if(tractopelle == null) {
> > +                   tractopelle = new Thread(this);
> > +                   tractopelle.start();
> > +                   return true;
> > +           } else {
> > +                   Logger.notice(this, "startSender(): Already started");
> > +                   return false;
> > +           }
> > +   }
> > +
> > +   public boolean stopSender() {
> > +           running = false;
> > +           tractopelle = null;
> > +           return true;
> > +   }
> > +
> > +   /**
> > +    * Just ignore the InterruptedException.
> > +    */
> > +   private void sleep(int ms) {
> > +           try {
> > +                   Thread.sleep(ms);
> > +           } catch(java.lang.InterruptedException e) {
> > +                   /* just iggnnnnnooored */
> > +           }
> > +   }
> > +}
> >
> > Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java  2006-07-15 21:40:40
> > UTC (rev 9629) +++
> > trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java      2006-07-16 15:49:09 UTC
> > (rev 9630) @@ -162,9 +162,9 @@
> >             queueManager.getQueryManager().addObserver(this);
> >
> >             progress = 0;
> > -           running = true;
> >             finished = false;
> >             successful = false;
> > +           running = false;
> >
> >             if(keyType == 2 && privateKey == null) {
> >                     generateSSK();
> > @@ -240,6 +240,8 @@
> >     }
> >
> >     public boolean continueInsert() {
> > +           running = true; /* Here we are really running */
> > +
> >             FCPConnection connection =
> > queueManager.getQueryManager().getConnection();
> >
> >             connection.lockWriting();
> > @@ -431,10 +433,11 @@
> >                             publicKey = msg.getValue("URI");
> >                             publicKey = publicKey.replaceAll("freenet:", 
> > "");
> >
> > -                           /*
> > +
> >                             if(keyType == 0)
> >                                     publicKey = publicKey + "/" +name;
> >
> > +                           /*
> >                             if(keyType > 0)
> >                                     publicKey = publicKey + "/" + name + 
> > "-" + (new
> > Integer(rev)).toString(); */
> > @@ -454,7 +457,7 @@
> >                             publicKey = publicKey.replaceAll("freenet:", 
> > "");
> >
> >                             if(keyType == 0)
> > -                                   publicKey = publicKey + name;
> > +                                   publicKey = publicKey + "/" + name;
> >                             if(keyType == 1)
> >                                     publicKey = "KSK@"+name+"-" + (new 
> > Integer(rev)).toString();
> >                             if(keyType == 2)
> > @@ -603,20 +606,26 @@
> >             setChanged();
> >             notifyObservers();
> >
> > -           FCPMessage msg = new FCPMessage();
> > -           msg.setMessageName("RemovePersistentRequest");
> > -           msg.setValue("Identifier", identifier);
> > -
> > -           if(global)
> > -                   msg.setValue("Global", "true");
> > -           else
> > -                   msg.setValue("Global", "false");
> > +           if(isRunning() || isFinished()) {
> > +                   FCPMessage msg = new FCPMessage();
> > +                   msg.setMessageName("RemovePersistentRequest");
> > +                   msg.setValue("Identifier", identifier);
> > +
> > +                   if(global)
> > +                           msg.setValue("Global", "true");
> > +                   else
> > +                           msg.setValue("Global", "false");
> > +
> > +                   queueManager.getQueryManager().writeMessage(msg);
> > +
> > +                   running = false;
> > +
> > +                   queueManager.getQueryManager().deleteObserver(this);
> > +           } else {
> > +                   Logger.notice(this, "Nothing to remove");
> > +           }
> >
> > -           queueManager.getQueryManager().writeMessage(msg);
> >
> > -           running = false;
> > -
> > -           queueManager.getQueryManager().deleteObserver(this);
> >             return true;
> >     }
> >
> >
> > Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-07-15 21:40:40
> > UTC (rev 9629) +++
> > trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java     2006-07-16 15:49:09 UTC
> > (rev 9630) @@ -17,9 +17,13 @@
> >   * After being instanciated, you should commit it to the
> > FCPQueryManager, and then * commit the FCPQueryManager to the
> > FCPQueueManager.
> >   * Call observer when connected / disconnected.
> > + * WARNING: This FCP implement don't guarantee that messages are sent in
> > the same order than initally put + *          if the lock on writting is
> > not set !
> >   * TODO: Add functions socketToFile(long size, File file) /
> > fileToSocket(File file) */
> >  public class FCPConnection extends Observable {
> > +   private FCPBufferedStream bufferedOut = null;
> > +   private int maxUploadSpeed = 0;
> >
> >     private String nodeAddress = null;
> >     private int port = 0;
> > @@ -41,9 +45,11 @@
> >
> >     /**
> >      * Don't connect. Call connect() for that.
> > +    * @param maxUploadSpeed in KB: -1 means no limit
> >      */
> >     public FCPConnection(String nodeAddress,
> > -                        int port)
> > +                        int port,
> > +                        int maxUploadSpeed)
> >     {
> >             if(DEBUG_MODE) {
> >                     Logger.notice(this, "DEBUG_MODE ACTIVATED");
> > @@ -51,22 +57,21 @@
> >
> >             setNodeAddress(nodeAddress);
> >             setNodePort(port);
> > +           setMaxUploadSpeed(maxUploadSpeed);
> >     }
> >
> >
> > -   /**
> > -    * You will probably have to use resetQueue() from the FCPQueueManager
> > after using this function. -         */
> >     public void setNodeAddress(String nodeAddress) {
> >             this.nodeAddress = nodeAddress;
> >     }
> >
> > -   /**
> > -    * You will probably have to use resetQueue() from the FCPQueueManager
> > after using this function. -         */
> >     public void setNodePort(int port) {
> >             this.port = port;
> >     }
> > +
> > +   public void setMaxUploadSpeed(int max) {
> > +           this.maxUploadSpeed = max;
> > +   }
> >
> >
> >     public void disconnect() {
> > @@ -83,6 +88,8 @@
> >             socket = null;
> >             in = null;
> >             out = null;
> > +           bufferedOut.stopSender();
> > +           bufferedOut = null;
> >
> >             setChanged();
> >             notifyObservers();
> > @@ -133,6 +140,8 @@
> >             }
> >
> >             reader = new BufferedInputStream(in);
> > +           bufferedOut = new FCPBufferedStream(this, maxUploadSpeed);
> > +           bufferedOut.startSender();
> >
> >             Logger.info(this, "Connected");
> >
> > @@ -167,6 +176,13 @@
> >      * Doesn't check the lock state ! You have to manage it yourself.
> >      */
> >     public boolean rawWrite(byte[] data) {
> > +           return bufferedOut.write(data);
> > +   }
> > +
> > +   /**
> > +    * Should be call by FCPBufferedStream. Not you.
> > +    */
> > +   public boolean realRawWrite(byte[] data) {
> >             if(out != null && socket != null && socket.isConnected()) {
> >                     try {
> >                             out.write(data);
> > @@ -182,11 +198,11 @@
> >             return true;
> >     }
> >
> > -   public synchronized boolean write(String toWrite) {
> > +   public boolean write(String toWrite) {
> >             return write(toWrite, true);
> >     }
> >
> > -   public synchronized boolean write(String toWrite, boolean checkLock) {
> > +   public boolean write(String toWrite, boolean checkLock) {
> >
> >             if(checkLock && lockWriting) {
> >                     Logger.verbose(this, "Writting lock, unable to write.");
> > @@ -205,13 +221,7 @@
> >
> >
> >             if(out != null && socket != null && socket.isConnected()) {
> > -                   try {
> > -                           out.write(toWrite.getBytes());
> > -                   } catch(java.io.IOException e) {
> > -                           Logger.warning(this, "Unable to write() on the 
> > socket ?! : "+
> > e.toString()); -                            disconnect();
> > -                           return false;
> > -                   }
> > +                   bufferedOut.write(toWrite.getBytes());
> >             } else {
> >                     Logger.warning(this, "Cannot write if disconnected 
> > !\n");
> >                     return false;
> > @@ -336,20 +346,4 @@
> >             return null;
> >     }
> >
> > -
> > -   /**
> > -    * Use this when you want to fetch the data still waiting on the
> > socket. -    */
> > -   public InputStream getInputStream() {
> > -           return in;
> > -   }
> > -
> > -
> > -   /**
> > -    * Use this when you want to send raw data.
> > -    */
> > -   public OutputStream getOutputStream() {
> > -           return out;
> > -   }
> > -
> >  }
> >
> > Modified: trunk/apps/Thaw/src/thaw/i18n/thaw.properties
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/i18n/thaw.properties   2006-07-15 21:40:40 UTC
> > (rev 9629) +++ trunk/apps/Thaw/src/thaw/i18n/thaw.properties        
> > 2006-07-16
> > 15:49:09 UTC (rev 9630) @@ -83,6 +83,9 @@
> >
> >  thaw.config.pluginsLoaded=Plugins loaded:
> >
> > +thaw.config.maxUploadSpeed=Maximum upload speed in KB/s (-1 = unlimited)
> > +
> > +
> >  ## Plugins
> >  thaw.plugin.insert.fileToInsert=File to insert
> >  thaw.plugin.insert.filesToInsert=File(s) to insert
> >
> > Modified: trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
> > ===================================================================
> > --- trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties        2006-07-15 
> > 21:40:40
> > UTC (rev 9629) +++
> > trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties    2006-07-16 15:49:09 UTC
> > (rev 9630) @@ -15,7 +15,7 @@
> >  thaw.common.insertions=Insertions
> >  thaw.common.downloads=Downloads
> >
> > -thaw.common.fetch=Aller chercher
> > +thaw.common.fetch=R?cup?rer
> >
> >  thaw.common.file=Fichier
> >  thaw.common.progress=Progr?s
> > @@ -83,6 +83,8 @@
> >
> >  thaw.config.pluginsLoaded=Plugins charg?s:
> >
> > +thaw.config.maxUploadSpeed=Vitesse maximum d'envoi en Ko/s (-1 =
> > illimit?) +
> >  ##?Plugins
> >  thaw.plugin.insert.fileToInsert=Fichier ? ins?rer
> >  thaw.plugin.insert.filesToInsert=Fichier(s) ? ins?rer
> >
> > Modified: trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
> > ===================================================================
> > ---
> > trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java      
> > 2006-07-15
> > 21:40:40 UTC (rev 9629) +++
> > trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java      
> > 2006-07-16
> > 15:49:09 UTC (rev 9630) @@ -220,7 +220,7 @@
> >
> >             mainPanel.setSize(400, 400);
> >
> > -           globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 50, 
> > 50));
> > +           globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 10, 
> > 10));
> >
> >             globalPanel.add(mainPanel);
> >     }
> >
> > Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> > ===================================================================
> > ---
> > trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java      
> > 2006-07-15
> > 21:40:40 UTC (rev 9629) +++
> > trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java      
> > 2006-07-16
> > 15:49:09 UTC (rev 9630) @@ -153,7 +153,7 @@
> >                     else
> >                             
> > key.setText(I18n.getMessage("thaw.common.unknown"));
> >
> > -                   if(query.getFileSize() > 0)
> > +                   if(query.getFileSize() == 0)
> >                             
> > size.setText(I18n.getMessage("thaw.common.unknown"));
> >                     else
> >                             size.setText((new 
> > Long(query.getFileSize())).toString()+" B");
> >
> > _______________________________________________
> > Thaw mailing list
> > Thaw at freenetproject.org
> > http://emu.freenetproject.org/cgi-bin/mailman/listinfo/thaw

-- 
Jerome Flesch.

Reply via email to