Author: jflesch
Date: 2006-07-16 15:49:09 +0000 (Sun, 16 Jul 2006)
New Revision: 9630

Added:
   trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
Modified:
   trunk/apps/Thaw/src/thaw/core/Config.java
   trunk/apps/Thaw/src/thaw/core/Core.java
   trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
   trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
   trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
   trunk/apps/Thaw/src/thaw/i18n/thaw.properties
   trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
   trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
   trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
Log:
Traffic shapper (for socket output only) is now implemented + Various bug fixes

Modified: trunk/apps/Thaw/src/thaw/core/Config.java
===================================================================
--- trunk/apps/Thaw/src/thaw/core/Config.java   2006-07-15 21:40:40 UTC (rev 
9629)
+++ trunk/apps/Thaw/src/thaw/core/Config.java   2006-07-16 15:49:09 UTC (rev 
9630)
@@ -297,6 +297,7 @@
                setValue("nodePort", "9481");
                setValue("maxSimultaneousDownloads", "-1");
                setValue("maxSimultaneousInsertions", "-1");
+               setValue("maxUploadSpeed", "-1");
                setValue("thawId", "thaw_"+(new Integer((new 
Random()).nextInt(1000))).toString());
        }


Modified: trunk/apps/Thaw/src/thaw/core/Core.java
===================================================================
--- trunk/apps/Thaw/src/thaw/core/Core.java     2006-07-15 21:40:40 UTC (rev 
9629)
+++ trunk/apps/Thaw/src/thaw/core/Core.java     2006-07-16 15:49:09 UTC (rev 
9630)
@@ -137,7 +137,8 @@
                        }

                        connection = new 
FCPConnection(config.getValue("nodeAddress"),
-                                                      (new 
Integer(config.getValue("nodePort"))).intValue());
+                                                      (new 
Integer(config.getValue("nodePort"))).intValue(),
+                                                      (new 
Integer(config.getValue("maxUploadSpeed"))).intValue());

                        if(!connection.connect()) {
                                new WarningWindow(this, "Unable to connect to 
"+config.getValue("nodeAddress")+":"+

Modified: trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java  2006-07-15 21:40:40 UTC 
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java  2006-07-16 15:49:09 UTC 
(rev 9630)
@@ -29,6 +29,7 @@
                I18n.getMessage("thaw.config.nodePort"),
                I18n.getMessage("thaw.config.maxSimultaneousDownloads"),
                I18n.getMessage("thaw.config.maxSimultaneousInsertions"),
+               I18n.getMessage("thaw.config.maxUploadSpeed"),
                I18n.getMessage("thaw.config.thawId")
        };

@@ -37,6 +38,7 @@
                "nodePort",
                "maxSimultaneousDownloads",
                "maxSimultaneousInsertions",
+               "maxUploadSpeed",
                "thawId"
        };


Added: trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java 2006-07-15 21:40:40 UTC 
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java 2006-07-16 15:49:09 UTC 
(rev 9630)
@@ -0,0 +1,166 @@
+package thaw.fcp;
+
+import java.net.Socket;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.InputStreamReader;
+
+import thaw.core.Logger;
+
+/**
+ * Only used by FCPConnection. Except special situation, you shouldn't have to 
use it directly.
+ * Currently only used for output. (shouldn't be really usefull for input).
+ * Some data are sent each 'INTERVAL' (in ms).
+ */
+public class FCPBufferedStream implements Runnable {
+       private FCPConnection connection;
+       private int maxUploadSpeed;
+       
+       private byte outputBuffer[];
+
+       public final static int OUTPUT_BUFFER_SIZE = 102400;
+       public final static int INTERVAL = 200;
+
+       private int waiting = 0; /* amount of data stored in the buffer */
+       private int readCursor = 0; /* indicates where the nex read will be */
+       private int writeCursor = 0; /* indicates where the next write will be 
*/
+
+       private Thread tractopelle = null;
+       private boolean running = true;
+       private int packetSize = 0;
+
+
+
+       public FCPBufferedStream(FCPConnection connection,
+                                int maxUploadSpeed) {
+               this.connection = connection;
+               this.maxUploadSpeed = maxUploadSpeed;
+               
+               if(maxUploadSpeed >= 0) {
+                       outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
+                       packetSize = (maxUploadSpeed * 1024) / (1000/INTERVAL);
+               }
+       }
+
+       /**
+        * Add to the buffer. Can block if buffer is full !
+        * Never send more than OUTPUT_BUFFER_SIZE.
+        */
+       public synchronized boolean write(byte[] data) {
+               if(maxUploadSpeed == -1) {
+                       return connection.realRawWrite(data);
+               }
+
+               while(waiting + data.length > OUTPUT_BUFFER_SIZE) {
+                       sleep(INTERVAL);
+               }
+
+               waiting += data.length;
+
+               for(int i = 0 ; i < data.length ; i++) {
+                       outputBuffer[writeCursor] = data[i];
+
+                       writeCursor++;
+
+                       if(writeCursor >= OUTPUT_BUFFER_SIZE)
+                               writeCursor = 0;
+               }
+
+               return true;
+       }
+
+       /**
+        * @see write(byte[])
+        */
+       public boolean write(String data) {
+               return write(data.getBytes());
+       }
+
+       /**
+        * extract from the buffer
+        */
+       private boolean readOutputBuffer(byte[] data) {
+               for(int i = 0; i < data.length ; i++) {
+                       data[i] = outputBuffer[readCursor];
+                       
+                       readCursor++;
+
+                       if(readCursor >= OUTPUT_BUFFER_SIZE)
+                               readCursor = 0;
+               }
+
+               waiting -= data.length;
+
+               return true;
+       }
+
+       /**
+        * wait for the buffer being empty.
+        */
+       public void flush() {
+               while(waiting > 0) {
+                       sleep(INTERVAL);
+               }               
+       }
+
+
+       public void run() {
+               byte[] data;
+               
+               while(running) { /* Wild and freeeeeee */
+                       if(waiting > 0) {
+                               int to_read = packetSize;
+                               
+                               if(waiting < to_read)
+                                       to_read = waiting;
+                               
+                               data = new byte[to_read];
+                               
+                               readOutputBuffer(data);
+                               
+                               connection.realRawWrite(data);
+                       }
+
+                       sleep(INTERVAL);
+               }               
+       }
+
+       /**
+        * Start the thread sending data from the buffer to the OutputStream 
(socket).
+        */
+       public boolean startSender() {
+               running = true;
+
+               if(maxUploadSpeed < 0) {
+                       Logger.notice(this, "startSender(): No upload limit. 
Not needed");
+                       return false;
+               }
+
+               if(tractopelle == null) {
+                       tractopelle = new Thread(this);
+                       tractopelle.start();
+                       return true;
+               } else {
+                       Logger.notice(this, "startSender(): Already started");
+                       return false;
+               }
+       }
+
+       public boolean stopSender() {
+               running = false;
+               tractopelle = null;
+               return true;
+       }
+
+       /**
+        * Just ignore the InterruptedException.
+        */
+       private void sleep(int ms) {
+               try {
+                       Thread.sleep(ms);
+               } catch(java.lang.InterruptedException e) {
+                       /* just iggnnnnnooored */
+               }
+       }
+}

Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java      2006-07-15 21:40:40 UTC 
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java      2006-07-16 15:49:09 UTC 
(rev 9630)
@@ -162,9 +162,9 @@
                queueManager.getQueryManager().addObserver(this);

                progress = 0;
-               running = true;
                finished = false;
                successful = false;
+               running = false;

                if(keyType == 2 && privateKey == null) {
                        generateSSK();
@@ -240,6 +240,8 @@
        }

        public boolean continueInsert() {
+               running = true; /* Here we are really running */
+
                FCPConnection connection = 
queueManager.getQueryManager().getConnection();

                connection.lockWriting();
@@ -431,10 +433,11 @@
                                publicKey = msg.getValue("URI");
                                publicKey = publicKey.replaceAll("freenet:", 
"");

-                               /*
+                               
                                if(keyType == 0)
                                        publicKey = publicKey + "/" +name;

+                               /*
                                if(keyType > 0)
                                        publicKey = publicKey + "/" + name + 
"-" + (new Integer(rev)).toString();
                                */
@@ -454,7 +457,7 @@
                                publicKey = publicKey.replaceAll("freenet:", 
"");

                                if(keyType == 0)
-                                       publicKey = publicKey + name;
+                                       publicKey = publicKey + "/" + name;
                                if(keyType == 1)
                                        publicKey = "KSK@"+name+"-" + (new 
Integer(rev)).toString();
                                if(keyType == 2)
@@ -603,20 +606,26 @@
                setChanged();
                notifyObservers();

-               FCPMessage msg = new FCPMessage();
-               msg.setMessageName("RemovePersistentRequest");
-               msg.setValue("Identifier", identifier);
-               
-               if(global)
-                       msg.setValue("Global", "true");
-               else
-                       msg.setValue("Global", "false");
+               if(isRunning() || isFinished()) {
+                       FCPMessage msg = new FCPMessage();
+                       msg.setMessageName("RemovePersistentRequest");
+                       msg.setValue("Identifier", identifier);
+                       
+                       if(global)
+                               msg.setValue("Global", "true");
+                       else
+                               msg.setValue("Global", "false");
+                       
+                       queueManager.getQueryManager().writeMessage(msg);
+                       
+                       running = false;
+                       
+                       queueManager.getQueryManager().deleteObserver(this);
+               } else {
+                       Logger.notice(this, "Nothing to remove");
+               }

-               queueManager.getQueryManager().writeMessage(msg);

-               running = false;
-
-               queueManager.getQueryManager().deleteObserver(this);
                return true;
        }


Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java     2006-07-15 21:40:40 UTC 
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java     2006-07-16 15:49:09 UTC 
(rev 9630)
@@ -17,9 +17,13 @@
  * After being instanciated, you should commit it to the FCPQueryManager, and 
then
  * commit the FCPQueryManager to the FCPQueueManager.
  * Call observer when connected / disconnected.
+ * WARNING: This FCP implement don't guarantee that messages are sent in the 
same order than initally put
+ *          if the lock on writting is not set !
  * TODO: Add functions socketToFile(long size, File file) / fileToSocket(File 
file)
  */
 public class FCPConnection extends Observable {
+       private FCPBufferedStream bufferedOut = null;
+       private int maxUploadSpeed = 0;

        private String nodeAddress = null;
        private int port = 0;
@@ -41,9 +45,11 @@

        /**
         * Don't connect. Call connect() for that.
+        * @param maxUploadSpeed in KB: -1 means no limit
         */
        public FCPConnection(String nodeAddress,
-                            int port)
+                            int port,
+                            int maxUploadSpeed)
        {
                if(DEBUG_MODE) {
                        Logger.notice(this, "DEBUG_MODE ACTIVATED");
@@ -51,22 +57,21 @@

                setNodeAddress(nodeAddress);
                setNodePort(port);
+               setMaxUploadSpeed(maxUploadSpeed);
        }


-       /**
-        * You will probably have to use resetQueue() from the FCPQueueManager 
after using this function.
-        */
        public void setNodeAddress(String nodeAddress) {
                this.nodeAddress = nodeAddress;
        }

-       /**
-        * You will probably have to use resetQueue() from the FCPQueueManager 
after using this function.
-        */
        public void setNodePort(int port) {
                this.port = port;
        }
+
+       public void setMaxUploadSpeed(int max) {
+               this.maxUploadSpeed = max;
+       }


        public void disconnect() {
@@ -83,6 +88,8 @@
                socket = null;
                in = null;
                out = null;
+               bufferedOut.stopSender();
+               bufferedOut = null;

                setChanged();
                notifyObservers();
@@ -133,6 +140,8 @@
                }

                reader = new BufferedInputStream(in);
+               bufferedOut = new FCPBufferedStream(this, maxUploadSpeed);
+               bufferedOut.startSender();

                Logger.info(this, "Connected");

@@ -167,6 +176,13 @@
         * Doesn't check the lock state ! You have to manage it yourself.
         */
        public boolean rawWrite(byte[] data) {
+               return bufferedOut.write(data);
+       }
+
+       /**
+        * Should be call by FCPBufferedStream. Not you.
+        */
+       public boolean realRawWrite(byte[] data) {
                if(out != null && socket != null && socket.isConnected()) {
                        try {
                                out.write(data);
@@ -182,11 +198,11 @@
                return true;
        }

-       public synchronized boolean write(String toWrite) {
+       public boolean write(String toWrite) {
                return write(toWrite, true);
        }

-       public synchronized boolean write(String toWrite, boolean checkLock) {
+       public boolean write(String toWrite, boolean checkLock) {

                if(checkLock && lockWriting) {
                        Logger.verbose(this, "Writting lock, unable to write.");
@@ -205,13 +221,7 @@


                if(out != null && socket != null && socket.isConnected()) {
-                       try {
-                               out.write(toWrite.getBytes());
-                       } catch(java.io.IOException e) {
-                               Logger.warning(this, "Unable to write() on the 
socket ?! : "+ e.toString());
-                               disconnect();
-                               return false;
-                       }
+                       bufferedOut.write(toWrite.getBytes());
                } else {
                        Logger.warning(this, "Cannot write if disconnected 
!\n");
                        return false;
@@ -336,20 +346,4 @@
                return null;
        }

-
-       /**
-        * Use this when you want to fetch the data still waiting on the socket.
-        */
-       public InputStream getInputStream() {
-               return in;
-       }
-
-
-       /**
-        * Use this when you want to send raw data.
-        */
-       public OutputStream getOutputStream() {
-               return out;
-       }
-
 }

Modified: trunk/apps/Thaw/src/thaw/i18n/thaw.properties
===================================================================
--- trunk/apps/Thaw/src/thaw/i18n/thaw.properties       2006-07-15 21:40:40 UTC 
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/i18n/thaw.properties       2006-07-16 15:49:09 UTC 
(rev 9630)
@@ -83,6 +83,9 @@

 thaw.config.pluginsLoaded=Plugins loaded:

+thaw.config.maxUploadSpeed=Maximum upload speed in KB/s (-1 = unlimited)
+
+
 ## Plugins
 thaw.plugin.insert.fileToInsert=File to insert
 thaw.plugin.insert.filesToInsert=File(s) to insert

Modified: trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
===================================================================
--- trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties    2006-07-15 21:40:40 UTC 
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties    2006-07-16 15:49:09 UTC 
(rev 9630)
@@ -15,7 +15,7 @@
 thaw.common.insertions=Insertions
 thaw.common.downloads=Downloads

-thaw.common.fetch=Aller chercher
+thaw.common.fetch=R?cup?rer

 thaw.common.file=Fichier
 thaw.common.progress=Progr?s
@@ -83,6 +83,8 @@

 thaw.config.pluginsLoaded=Plugins charg?s:

+thaw.config.maxUploadSpeed=Vitesse maximum d'envoi en Ko/s (-1 = illimit?)
+
 ##?Plugins
 thaw.plugin.insert.fileToInsert=Fichier ? ins?rer
 thaw.plugin.insert.filesToInsert=Fichier(s) ? ins?rer

Modified: trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java      
2006-07-15 21:40:40 UTC (rev 9629)
+++ trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java      
2006-07-16 15:49:09 UTC (rev 9630)
@@ -220,7 +220,7 @@

                mainPanel.setSize(400, 400);

-               globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 50, 
50));
+               globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 10, 
10));

                globalPanel.add(mainPanel);
        }

Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java      
2006-07-15 21:40:40 UTC (rev 9629)
+++ trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java      
2006-07-16 15:49:09 UTC (rev 9630)
@@ -153,7 +153,7 @@
                        else
                                
key.setText(I18n.getMessage("thaw.common.unknown"));

-                       if(query.getFileSize() > 0)
+                       if(query.getFileSize() == 0)
                                
size.setText(I18n.getMessage("thaw.common.unknown"));
                        else
                                size.setText((new 
Long(query.getFileSize())).toString()+" B");


Reply via email to