Author: jflesch
Date: 2006-07-16 15:49:09 +0000 (Sun, 16 Jul 2006)
New Revision: 9630
Added:
trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
Modified:
trunk/apps/Thaw/src/thaw/core/Config.java
trunk/apps/Thaw/src/thaw/core/Core.java
trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
trunk/apps/Thaw/src/thaw/i18n/thaw.properties
trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
Log:
Traffic shapper (for socket output only) is now implemented + Various bug fixes
Modified: trunk/apps/Thaw/src/thaw/core/Config.java
===================================================================
--- trunk/apps/Thaw/src/thaw/core/Config.java 2006-07-15 21:40:40 UTC (rev
9629)
+++ trunk/apps/Thaw/src/thaw/core/Config.java 2006-07-16 15:49:09 UTC (rev
9630)
@@ -297,6 +297,7 @@
setValue("nodePort", "9481");
setValue("maxSimultaneousDownloads", "-1");
setValue("maxSimultaneousInsertions", "-1");
+ setValue("maxUploadSpeed", "-1");
setValue("thawId", "thaw_"+(new Integer((new
Random()).nextInt(1000))).toString());
}
Modified: trunk/apps/Thaw/src/thaw/core/Core.java
===================================================================
--- trunk/apps/Thaw/src/thaw/core/Core.java 2006-07-15 21:40:40 UTC (rev
9629)
+++ trunk/apps/Thaw/src/thaw/core/Core.java 2006-07-16 15:49:09 UTC (rev
9630)
@@ -137,7 +137,8 @@
}
connection = new
FCPConnection(config.getValue("nodeAddress"),
- (new
Integer(config.getValue("nodePort"))).intValue());
+ (new
Integer(config.getValue("nodePort"))).intValue(),
+ (new
Integer(config.getValue("maxUploadSpeed"))).intValue());
if(!connection.connect()) {
new WarningWindow(this, "Unable to connect to
"+config.getValue("nodeAddress")+":"+
Modified: trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java 2006-07-15 21:40:40 UTC
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java 2006-07-16 15:49:09 UTC
(rev 9630)
@@ -29,6 +29,7 @@
I18n.getMessage("thaw.config.nodePort"),
I18n.getMessage("thaw.config.maxSimultaneousDownloads"),
I18n.getMessage("thaw.config.maxSimultaneousInsertions"),
+ I18n.getMessage("thaw.config.maxUploadSpeed"),
I18n.getMessage("thaw.config.thawId")
};
@@ -37,6 +38,7 @@
"nodePort",
"maxSimultaneousDownloads",
"maxSimultaneousInsertions",
+ "maxUploadSpeed",
"thawId"
};
Added: trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java 2006-07-15 21:40:40 UTC
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java 2006-07-16 15:49:09 UTC
(rev 9630)
@@ -0,0 +1,166 @@
+package thaw.fcp;
+
+import java.net.Socket;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.InputStreamReader;
+
+import thaw.core.Logger;
+
+/**
+ * Only used by FCPConnection. Except special situation, you shouldn't have to
use it directly.
+ * Currently only used for output. (shouldn't be really usefull for input).
+ * Some data are sent each 'INTERVAL' (in ms).
+ */
+public class FCPBufferedStream implements Runnable {
+ private FCPConnection connection;
+ private int maxUploadSpeed;
+
+ private byte outputBuffer[];
+
+ public final static int OUTPUT_BUFFER_SIZE = 102400;
+ public final static int INTERVAL = 200;
+
+ private int waiting = 0; /* amount of data stored in the buffer */
+ private int readCursor = 0; /* indicates where the nex read will be */
+ private int writeCursor = 0; /* indicates where the next write will be
*/
+
+ private Thread tractopelle = null;
+ private boolean running = true;
+ private int packetSize = 0;
+
+
+
+ public FCPBufferedStream(FCPConnection connection,
+ int maxUploadSpeed) {
+ this.connection = connection;
+ this.maxUploadSpeed = maxUploadSpeed;
+
+ if(maxUploadSpeed >= 0) {
+ outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
+ packetSize = (maxUploadSpeed * 1024) / (1000/INTERVAL);
+ }
+ }
+
+ /**
+ * Add to the buffer. Can block if buffer is full !
+ * Never send more than OUTPUT_BUFFER_SIZE.
+ */
+ public synchronized boolean write(byte[] data) {
+ if(maxUploadSpeed == -1) {
+ return connection.realRawWrite(data);
+ }
+
+ while(waiting + data.length > OUTPUT_BUFFER_SIZE) {
+ sleep(INTERVAL);
+ }
+
+ waiting += data.length;
+
+ for(int i = 0 ; i < data.length ; i++) {
+ outputBuffer[writeCursor] = data[i];
+
+ writeCursor++;
+
+ if(writeCursor >= OUTPUT_BUFFER_SIZE)
+ writeCursor = 0;
+ }
+
+ return true;
+ }
+
+ /**
+ * @see write(byte[])
+ */
+ public boolean write(String data) {
+ return write(data.getBytes());
+ }
+
+ /**
+ * extract from the buffer
+ */
+ private boolean readOutputBuffer(byte[] data) {
+ for(int i = 0; i < data.length ; i++) {
+ data[i] = outputBuffer[readCursor];
+
+ readCursor++;
+
+ if(readCursor >= OUTPUT_BUFFER_SIZE)
+ readCursor = 0;
+ }
+
+ waiting -= data.length;
+
+ return true;
+ }
+
+ /**
+ * wait for the buffer being empty.
+ */
+ public void flush() {
+ while(waiting > 0) {
+ sleep(INTERVAL);
+ }
+ }
+
+
+ public void run() {
+ byte[] data;
+
+ while(running) { /* Wild and freeeeeee */
+ if(waiting > 0) {
+ int to_read = packetSize;
+
+ if(waiting < to_read)
+ to_read = waiting;
+
+ data = new byte[to_read];
+
+ readOutputBuffer(data);
+
+ connection.realRawWrite(data);
+ }
+
+ sleep(INTERVAL);
+ }
+ }
+
+ /**
+ * Start the thread sending data from the buffer to the OutputStream
(socket).
+ */
+ public boolean startSender() {
+ running = true;
+
+ if(maxUploadSpeed < 0) {
+ Logger.notice(this, "startSender(): No upload limit.
Not needed");
+ return false;
+ }
+
+ if(tractopelle == null) {
+ tractopelle = new Thread(this);
+ tractopelle.start();
+ return true;
+ } else {
+ Logger.notice(this, "startSender(): Already started");
+ return false;
+ }
+ }
+
+ public boolean stopSender() {
+ running = false;
+ tractopelle = null;
+ return true;
+ }
+
+ /**
+ * Just ignore the InterruptedException.
+ */
+ private void sleep(int ms) {
+ try {
+ Thread.sleep(ms);
+ } catch(java.lang.InterruptedException e) {
+ /* just iggnnnnnooored */
+ }
+ }
+}
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java 2006-07-15 21:40:40 UTC
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java 2006-07-16 15:49:09 UTC
(rev 9630)
@@ -162,9 +162,9 @@
queueManager.getQueryManager().addObserver(this);
progress = 0;
- running = true;
finished = false;
successful = false;
+ running = false;
if(keyType == 2 && privateKey == null) {
generateSSK();
@@ -240,6 +240,8 @@
}
public boolean continueInsert() {
+ running = true; /* Here we are really running */
+
FCPConnection connection =
queueManager.getQueryManager().getConnection();
connection.lockWriting();
@@ -431,10 +433,11 @@
publicKey = msg.getValue("URI");
publicKey = publicKey.replaceAll("freenet:",
"");
- /*
+
if(keyType == 0)
publicKey = publicKey + "/" +name;
+ /*
if(keyType > 0)
publicKey = publicKey + "/" + name +
"-" + (new Integer(rev)).toString();
*/
@@ -454,7 +457,7 @@
publicKey = publicKey.replaceAll("freenet:",
"");
if(keyType == 0)
- publicKey = publicKey + name;
+ publicKey = publicKey + "/" + name;
if(keyType == 1)
publicKey = "KSK@"+name+"-" + (new
Integer(rev)).toString();
if(keyType == 2)
@@ -603,20 +606,26 @@
setChanged();
notifyObservers();
- FCPMessage msg = new FCPMessage();
- msg.setMessageName("RemovePersistentRequest");
- msg.setValue("Identifier", identifier);
-
- if(global)
- msg.setValue("Global", "true");
- else
- msg.setValue("Global", "false");
+ if(isRunning() || isFinished()) {
+ FCPMessage msg = new FCPMessage();
+ msg.setMessageName("RemovePersistentRequest");
+ msg.setValue("Identifier", identifier);
+
+ if(global)
+ msg.setValue("Global", "true");
+ else
+ msg.setValue("Global", "false");
+
+ queueManager.getQueryManager().writeMessage(msg);
+
+ running = false;
+
+ queueManager.getQueryManager().deleteObserver(this);
+ } else {
+ Logger.notice(this, "Nothing to remove");
+ }
- queueManager.getQueryManager().writeMessage(msg);
- running = false;
-
- queueManager.getQueryManager().deleteObserver(this);
return true;
}
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-07-15 21:40:40 UTC
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-07-16 15:49:09 UTC
(rev 9630)
@@ -17,9 +17,13 @@
* After being instanciated, you should commit it to the FCPQueryManager, and
then
* commit the FCPQueryManager to the FCPQueueManager.
* Call observer when connected / disconnected.
+ * WARNING: This FCP implement don't guarantee that messages are sent in the
same order than initally put
+ * if the lock on writting is not set !
* TODO: Add functions socketToFile(long size, File file) / fileToSocket(File
file)
*/
public class FCPConnection extends Observable {
+ private FCPBufferedStream bufferedOut = null;
+ private int maxUploadSpeed = 0;
private String nodeAddress = null;
private int port = 0;
@@ -41,9 +45,11 @@
/**
* Don't connect. Call connect() for that.
+ * @param maxUploadSpeed in KB: -1 means no limit
*/
public FCPConnection(String nodeAddress,
- int port)
+ int port,
+ int maxUploadSpeed)
{
if(DEBUG_MODE) {
Logger.notice(this, "DEBUG_MODE ACTIVATED");
@@ -51,22 +57,21 @@
setNodeAddress(nodeAddress);
setNodePort(port);
+ setMaxUploadSpeed(maxUploadSpeed);
}
- /**
- * You will probably have to use resetQueue() from the FCPQueueManager
after using this function.
- */
public void setNodeAddress(String nodeAddress) {
this.nodeAddress = nodeAddress;
}
- /**
- * You will probably have to use resetQueue() from the FCPQueueManager
after using this function.
- */
public void setNodePort(int port) {
this.port = port;
}
+
+ public void setMaxUploadSpeed(int max) {
+ this.maxUploadSpeed = max;
+ }
public void disconnect() {
@@ -83,6 +88,8 @@
socket = null;
in = null;
out = null;
+ bufferedOut.stopSender();
+ bufferedOut = null;
setChanged();
notifyObservers();
@@ -133,6 +140,8 @@
}
reader = new BufferedInputStream(in);
+ bufferedOut = new FCPBufferedStream(this, maxUploadSpeed);
+ bufferedOut.startSender();
Logger.info(this, "Connected");
@@ -167,6 +176,13 @@
* Doesn't check the lock state ! You have to manage it yourself.
*/
public boolean rawWrite(byte[] data) {
+ return bufferedOut.write(data);
+ }
+
+ /**
+ * Should be call by FCPBufferedStream. Not you.
+ */
+ public boolean realRawWrite(byte[] data) {
if(out != null && socket != null && socket.isConnected()) {
try {
out.write(data);
@@ -182,11 +198,11 @@
return true;
}
- public synchronized boolean write(String toWrite) {
+ public boolean write(String toWrite) {
return write(toWrite, true);
}
- public synchronized boolean write(String toWrite, boolean checkLock) {
+ public boolean write(String toWrite, boolean checkLock) {
if(checkLock && lockWriting) {
Logger.verbose(this, "Writting lock, unable to write.");
@@ -205,13 +221,7 @@
if(out != null && socket != null && socket.isConnected()) {
- try {
- out.write(toWrite.getBytes());
- } catch(java.io.IOException e) {
- Logger.warning(this, "Unable to write() on the
socket ?! : "+ e.toString());
- disconnect();
- return false;
- }
+ bufferedOut.write(toWrite.getBytes());
} else {
Logger.warning(this, "Cannot write if disconnected
!\n");
return false;
@@ -336,20 +346,4 @@
return null;
}
-
- /**
- * Use this when you want to fetch the data still waiting on the socket.
- */
- public InputStream getInputStream() {
- return in;
- }
-
-
- /**
- * Use this when you want to send raw data.
- */
- public OutputStream getOutputStream() {
- return out;
- }
-
}
Modified: trunk/apps/Thaw/src/thaw/i18n/thaw.properties
===================================================================
--- trunk/apps/Thaw/src/thaw/i18n/thaw.properties 2006-07-15 21:40:40 UTC
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/i18n/thaw.properties 2006-07-16 15:49:09 UTC
(rev 9630)
@@ -83,6 +83,9 @@
thaw.config.pluginsLoaded=Plugins loaded:
+thaw.config.maxUploadSpeed=Maximum upload speed in KB/s (-1 = unlimited)
+
+
## Plugins
thaw.plugin.insert.fileToInsert=File to insert
thaw.plugin.insert.filesToInsert=File(s) to insert
Modified: trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
===================================================================
--- trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties 2006-07-15 21:40:40 UTC
(rev 9629)
+++ trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties 2006-07-16 15:49:09 UTC
(rev 9630)
@@ -15,7 +15,7 @@
thaw.common.insertions=Insertions
thaw.common.downloads=Downloads
-thaw.common.fetch=Aller chercher
+thaw.common.fetch=R?cup?rer
thaw.common.file=Fichier
thaw.common.progress=Progr?s
@@ -83,6 +83,8 @@
thaw.config.pluginsLoaded=Plugins charg?s:
+thaw.config.maxUploadSpeed=Vitesse maximum d'envoi en Ko/s (-1 = illimit?)
+
##?Plugins
thaw.plugin.insert.fileToInsert=Fichier ? ins?rer
thaw.plugin.insert.filesToInsert=Fichier(s) ? ins?rer
Modified: trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
2006-07-15 21:40:40 UTC (rev 9629)
+++ trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
2006-07-16 15:49:09 UTC (rev 9630)
@@ -220,7 +220,7 @@
mainPanel.setSize(400, 400);
- globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 50,
50));
+ globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 10,
10));
globalPanel.add(mainPanel);
}
Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
2006-07-15 21:40:40 UTC (rev 9629)
+++ trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
2006-07-16 15:49:09 UTC (rev 9630)
@@ -153,7 +153,7 @@
else
key.setText(I18n.getMessage("thaw.common.unknown"));
- if(query.getFileSize() > 0)
+ if(query.getFileSize() == 0)
size.setText(I18n.getMessage("thaw.common.unknown"));
else
size.setText((new
Long(query.getFileSize())).toString()+" B");