Author: jflesch
Date: 2006-07-10 23:27:27 +0000 (Mon, 10 Jul 2006)
New Revision: 9552
Modified:
trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java
trunk/apps/Thaw/src/thaw/fcp/FCPQueryManager.java
trunk/apps/Thaw/src/thaw/fcp/FCPTransferQuery.java
trunk/apps/Thaw/src/thaw/i18n/thaw.properties
trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueuePanel.java
trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueueTableModel.java
Log:
Fix some selection problems
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java 2006-07-10 21:05:40 UTC
(rev 9551)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java 2006-07-10 23:27:27 UTC
(rev 9552)
@@ -148,7 +148,7 @@
fileSize = (new
Long(message.getValue("DataLength"))).longValue();
- if(globalQueue) {
+ if(isPersistent() && !isFinished()) {
FCPMessage getRequestStatus = new FCPMessage();
getRequestStatus.setMessageName("GetRequestStatus");
@@ -160,6 +160,7 @@
getRequestStatus.setValue("OnlyData", "true");
queueManager.getQueryManager().writeMessage(getRequestStatus);
+
}
@@ -254,20 +255,36 @@
status = "Writing";
- if(fetchDirectly(fileSize)) {
+
//queueManager.getQueryManager().getConnection().lockWriting();
+
+
+ if(fetchDirectly(fileSize, true)) {
successful = true;
status = "Available";
+ } else {
+ Logger.warning(this, "Unable to fetch correctly
the file. This may create problems on socket");
}
+
+
//queueManager.getQueryManager().getConnection().unlockWriting();
running = false;
progress = 100;
queueManager.getQueryManager().deleteObserver(this);
+ if(isPersistent())
+ removePersistent();
+
+ setChanged();
+ notifyObservers();
+
+
return;
}
if(message.getMessageName().equals("PersistentGet")) {
+ Logger.debug(this, "PersistentGet !");
+
status = "Fetching";
return;
}
@@ -277,21 +294,27 @@
}
- public boolean fetchDirectly(long size) {
+ public boolean fetchDirectly(long size, boolean reallyWrite) {
FCPConnection connection;
+
File newFile = new File(getPath());
- FileOutputStream fileWriter;
+ FileOutputStream fileWriter = null;
+
connection = queueManager.getQueryManager().getConnection();
- Logger.info(this, "Writing file to disk ...");
-
- try {
- fileWriter = new FileOutputStream(newFile);
- } catch(java.io.IOException e) {
- Logger.error(this, "Unable to write file on disk ...
perms ? : "+e.toString());
- status = "Write error";
- return false;
+ if(reallyWrite) {
+ Logger.info(this, "Writing file to disk ...");
+
+ try {
+ fileWriter = new FileOutputStream(newFile);
+ } catch(java.io.IOException e) {
+ Logger.error(this, "Unable to write file on
disk ... perms ? : "+e.toString());
+ status = "Write error";
+ return false;
+ }
+ } else {
+ Logger.info(this, "File is supposed already written.
Not rewriting.");
}
/* size == bytes remaining on socket */
@@ -313,22 +336,26 @@
break;
}
- try {
- fileWriter.write(read, 0, amount);
- } catch(java.io.IOException e) {
- Logger.error(this, "Unable to write file on
disk ... out of space ? : "+e.toString());
- status = "Write error";
- return false;
+ if(reallyWrite) {
+ try {
+ fileWriter.write(read, 0, amount);
+ } catch(java.io.IOException e) {
+ Logger.error(this, "Unable to write
file on disk ... out of space ? : "+e.toString());
+ status = "Write error";
+ return false;
+ }
}
size = size - amount;
}
- try {
- fileWriter.close();
- } catch(java.io.IOException e) {
- Logger.notice(this, "Unable to close correctly file on
disk !? : "+e.toString());
+ if(reallyWrite) {
+ try {
+ fileWriter.close();
+ } catch(java.io.IOException e) {
+ Logger.notice(this, "Unable to close correctly
file on disk !? : "+e.toString());
+ }
}
Logger.info(this, "File written");
@@ -336,7 +363,23 @@
return true;
}
+
+ private void removePersistent() {
+ FCPMessage stopMessage = new FCPMessage();
+
+ stopMessage.setMessageName("RemovePersistentRequest");
+
+ if(globalQueue)
+ stopMessage.setValue("Global", "true");
+ else
+ stopMessage.setValue("Global", "false");
+
+ stopMessage.setValue("Identifier", identifier);
+
+ queueManager.getQueryManager().writeMessage(stopMessage);
+ }
+
public boolean stop(FCPQueueManager queryManager) {
Logger.info(this, "Stop fetching of the key : "+getFileKey());
@@ -346,18 +389,7 @@
}
if(isPersistent()) {
- FCPMessage stopMessage = new FCPMessage();
-
- stopMessage.setMessageName("RemovePersistentRequest");
-
- if(globalQueue)
- stopMessage.setValue("Global", "true");
- else
- stopMessage.setValue("Global", "false");
-
- stopMessage.setValue("Identifier", identifier);
-
-
queueManager.getQueryManager().writeMessage(stopMessage);
+ removePersistent();
} else {
Logger.warning(this, "Can't stop a non-persistent
query, will continue in background ...");
return false;
@@ -406,6 +438,10 @@
return attempt;
}
+ public void setAttempt(int x) {
+ attempt = x;
+ }
+
public int getMaxAttempt() {
return MAX_RETRIES;
}
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-07-10 21:05:40 UTC
(rev 9551)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-07-10 23:27:27 UTC
(rev 9552)
@@ -29,6 +29,10 @@
private BufferedInputStream reader = null;
+ private long rawBytesWaiting = 0;
+
+ private boolean lockWriting = false;
+
/** If == 1, then will print on stdout
* all fcp input / output.
*/
@@ -141,9 +145,27 @@
return socket.isConnected();
}
+
+ public void lockWriting() {
+ lockWriting = true;
+ }
+ public void unlockWriting() {
+ lockWriting = false;
+ }
+
public synchronized boolean write(String toWrite) {
+
+ while(lockWriting) {
+ Logger.verbose(this, "Writting lock, unable to write.");
+ try {
+ Thread.sleep(200);
+ } catch(java.lang.InterruptedException e) {
+ /* On s'en fout, mais alors d'une force ... */
+ }
+ }
+
Logger.asIt(this, "Thaw >>> Node :");
Logger.asIt(this, toWrite);
@@ -162,14 +184,32 @@
return true;
}
+ /**
+ * For security : FCPQueryManager uses this function to tells
FCPConnection
+ * how many raw bytes are waited (to avoid to *serious* problems).
+ */
+ public void setRawDataWaiting(long waiting) {
+ rawBytesWaiting = waiting;
+ }
- public int read(int lng, byte[] buf) {
-
+ /**
+ * @param lng Obsolete.
+ */
+ public synchronized int read(int lng, byte[] buf) {
+ int rdBytes = 0;
try {
- return reader.read(buf);
+ rdBytes = reader.read(buf);
+
+ rawBytesWaiting = rawBytesWaiting - rdBytes;
+
+ Logger.verbose(this, "Remaining: "+rawBytesWaiting);
+
+ return rdBytes;
} catch(java.io.IOException e) {
- Logger.warning(this, "IOException while reading on
socket");
- return -1;
+ Logger.error(this, "IOException while reading raw bytes
on socket, will probably cause troubles");
+ Logger.error(this, e.getMessage() + ":" +e.getCause());
+ System.exit(3);
+ return -2; /* -1 can mean eof */
}
}
@@ -179,6 +219,28 @@
* @return null if disconnected or error
*/
public String readLine() {
+
+ /* SECURITY */
+ if(rawBytesWaiting > 0) {
+ Logger.error(this, "RAW BYTES STILL WAITING ON SOCKET.
THIS IS ABNORMAL.");
+ Logger.error(this, "Will drop them.");
+
+ while(rawBytesWaiting > 0) {
+ int to_read = 1024;
+
+ if(to_read > rawBytesWaiting)
+ to_read = (int)rawBytesWaiting;
+
+ byte[] read = new byte[to_read];
+ read(to_read, read);
+
+ rawBytesWaiting = rawBytesWaiting - to_read;
+ }
+
+ }
+
+
+
String result;
if(in != null && reader != null && socket != null &&
socket.isConnected()) {
@@ -206,7 +268,11 @@
}
- Logger.asIt(this, "Thaw <<< Node : "+result);
+
+ if(result.matches("[- \\?.a-zA-Z0-9,~%@/_=]*"))
+ Logger.asIt(this, "Thaw <<< Node :
"+result);
+ else
+ Logger.error(this, "PROBABLE RAW
MESSAGE. ABNORMAL.");
return result;
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java 2006-07-10 21:05:40 UTC
(rev 9551)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java 2006-07-10 23:27:27 UTC
(rev 9552)
@@ -55,6 +55,11 @@
String[] affectation = lines[i].split("=");
+ if(affectation.length < 2) {
+ Logger.notice(this, "Malformed message");
+ continue;
+ }
+
setValue(affectation[0], affectation[1]);
}
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPQueryManager.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPQueryManager.java 2006-07-10 21:05:40 UTC
(rev 9551)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPQueryManager.java 2006-07-10 23:27:27 UTC
(rev 9552)
@@ -48,8 +48,12 @@
public FCPMessage readMessage() {
String whatsUp = new String("");
FCPMessage result = new FCPMessage();
+ boolean withData;
+
+ withData = false;
while(true) {
+
String read = new String("");
read = connection.readLine();
@@ -59,10 +63,15 @@
return null;
}
- if(read.equals("Data") || read.equals("EndMessage")) {
+ if(read.equals("Data")) {
+ withData = true;
break;
}
+ if(read.equals("EndMessage")) {
+ break;
+ }
+
whatsUp = whatsUp + read + "\n";
}
@@ -70,6 +79,12 @@
result.loadFromRawMessage(whatsUp);
+ if(withData) {
+ long dataWaiting = (new
Long(result.getValue("DataLength"))).longValue();
+ connection.setRawDataWaiting(dataWaiting);
+ Logger.info(this, "Achtung data: "+(new
Long(dataWaiting)).toString());
+ }
+
return result;
}
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPTransferQuery.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPTransferQuery.java 2006-07-10 21:05:40 UTC
(rev 9551)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPTransferQuery.java 2006-07-10 23:27:27 UTC
(rev 9552)
@@ -54,6 +54,8 @@
*/
public int getAttempt();
+ public void setAttempt(int x);
+
/**
* @return can return -1
*/
Modified: trunk/apps/Thaw/src/thaw/i18n/thaw.properties
===================================================================
--- trunk/apps/Thaw/src/thaw/i18n/thaw.properties 2006-07-10 21:05:40 UTC
(rev 9551)
+++ trunk/apps/Thaw/src/thaw/i18n/thaw.properties 2006-07-10 23:27:27 UTC
(rev 9552)
@@ -8,6 +8,7 @@
thaw.common.add=Add
thaw.common.remove=Remove
thaw.common.status=Status
+thaw.common.identifier=Identifier
thaw.common.insertion=Insertion
thaw.common.download=Download
@@ -38,10 +39,11 @@
thaw.common.priority=Priority
-thaw.common.remove=Remove
+thaw.common.removeFromTheList=Remove from the list
thaw.common.cancel=Cancel
thaw.common.delay=Delay
thaw.common.copyKeysToClipboard=Copy keys to clipboard
+thaw.common.forceRestart=Force restart
## Errors
thaw.error.idAlreadyUsed=Unable to connect. Our Id is already used by another
client connected to the node.
Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
2006-07-10 21:05:40 UTC (rev 9551)
+++ trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
2006-07-10 23:27:27 UTC (rev 9552)
@@ -36,6 +36,7 @@
private JTextField path = new JTextField();
private JTextField priority = new JTextField();
private JTextField attempt = new JTextField();
+ private JTextField identifier = new JTextField();
private FCPTransferQuery query = null;
@@ -56,7 +57,9 @@
I18n.getMessage("thaw.common.key"),
I18n.getMessage("thaw.common.localPath"),
I18n.getMessage("thaw.common.priority"),
- I18n.getMessage("thaw.common.try")+" #"
};
+ I18n.getMessage("thaw.common.try")+" #",
+
I18n.getMessage("thaw.common.identifier")
+ };
subPanel.setLayout(new GridLayout(fieldNames.length*2, 1));
@@ -81,6 +84,7 @@
case(5): field = path; path.setEditable(false);
break;
case(6): field = priority;
priority.setEditable(false); break;
case(7): field = attempt;
attempt.setEditable(false); break;
+ case(8): field = identifier;
identifier.setEditable(false); break;
default: Logger.error(this, "Gouli goula ? ...
is going to crash :p"); break;
}
@@ -126,10 +130,15 @@
else
progress.setString("FAILED");
status.setText(query.getStatus());
+ if(query.getIdentifier() != null)
+ identifier.setText(query.getIdentifier());
+ else
+ identifier.setText("N/A");
} else {
progress.setValue(0);
progress.setString("");
status.setText("");
+ identifier.setText("");
}
}
Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueuePanel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueuePanel.java
2006-07-10 21:05:40 UTC (rev 9551)
+++ trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueuePanel.java
2006-07-10 23:27:27 UTC (rev 9552)
@@ -17,6 +17,8 @@
import java.awt.event.MouseListener;
import java.awt.event.MouseEvent;
+import java.awt.event.KeyListener;
+import java.awt.event.KeyEvent;
import javax.swing.JPopupMenu;
import javax.swing.JMenuItem;
@@ -33,7 +35,7 @@
import thaw.fcp.*;
-public class QueuePanel implements MouseListener, ActionListener,
ClipboardOwner {
+public class QueuePanel implements MouseListener, ActionListener,
ClipboardOwner, KeyListener {
private Core core;
private JLabel label;
@@ -50,10 +52,12 @@
private JMenuItem removeItem;
private JMenuItem cancelItem;
private JMenuItem delayItem;
+ private JMenuItem forceRestartItem;
private JMenuItem copyKeysItem;
private int lastRowSelected = -1; /* Used for detail panel */
private int[] selectedRows;
+ private Vector queries;
private boolean insertionQueue = false;
@@ -88,20 +92,26 @@
tableModel.addTableModelListener(table);
rightClickMenu = new JPopupMenu();
- removeItem = new
JMenuItem(I18n.getMessage("thaw.common.remove"));
+ removeItem = new
JMenuItem(I18n.getMessage("thaw.common.removeFromTheList"));
cancelItem = new
JMenuItem(I18n.getMessage("thaw.common.cancel"));
delayItem = new JMenuItem(I18n.getMessage("thaw.common.delay"));
+ forceRestartItem = new
JMenuItem(I18n.getMessage("thaw.common.forceRestart"));
copyKeysItem = new
JMenuItem(I18n.getMessage("thaw.common.copyKeysToClipboard"));
rightClickMenu.add(removeItem);
rightClickMenu.add(cancelItem);
+ rightClickMenu.add(delayItem);
+ rightClickMenu.add(forceRestartItem);
rightClickMenu.add(copyKeysItem);
removeItem.addActionListener(this);
cancelItem.addActionListener(this);
copyKeysItem.addActionListener(this);
+ forceRestartItem.addActionListener(this);
+ delayItem.addActionListener(this);
table.addMouseListener(this);
+ table.addKeyListener(this);
/* If a queue is already existing, we need to add it */
addToTable(core.getQueueManager().getRunningQueue());
@@ -207,7 +217,10 @@
String keys = "";
for(int i = 0 ; i < selectedRows.length;i++) {
- FCPTransferQuery query = tableModel.getQuery(i);
+ FCPTransferQuery query =
(FCPTransferQuery)queries.get(selectedRows[i]);
+
+ if(query == null)
+ continue;
if(e.getSource() == removeItem) {
if(query.isRunning() &&
!query.isFinished())
@@ -229,6 +242,14 @@
}
}
+ if(e.getSource() == forceRestartItem) {
+ if(query.isRunning() &&
!query.isFinished())
+
query.stop(core.getQueueManager());
+
+ query.setAttempt(0);
+ query.start(core.getQueueManager());
+ }
+
if(e.getSource() == copyKeysItem) {
keys = keys + query.getFileKey() + "\n";
}
@@ -242,11 +263,13 @@
Clipboard cp = tk.getSystemClipboard();
cp.setContents(st, this);
}
+
}
public void mouseClicked(MouseEvent e) {
if(e.getButton() == MouseEvent.BUTTON3) {
selectedRows = table.getSelectedRows();
+ queries = tableModel.getQueries();
rightClickMenu.show(e.getComponent(), e.getX(),
e.getY());
}
@@ -275,5 +298,10 @@
/* we dont care */
}
+
+ public void keyPressed(KeyEvent e) { }
+
+ public void keyReleased(KeyEvent e) { refresh(); }
+ public void keyTyped(KeyEvent e) { }
}
Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueueTableModel.java
===================================================================
--- trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueueTableModel.java
2006-07-10 21:05:40 UTC (rev 9551)
+++ trunk/apps/Thaw/src/thaw/plugins/queueWatcher/QueueTableModel.java
2006-07-10 23:27:27 UTC (rev 9552)
@@ -79,7 +79,8 @@
return false;
}
- public void resetTable() {
+ public synchronized void resetTable() {
+
if(queries != null) {
for(Iterator it = queries.iterator();
it.hasNext();) {
@@ -93,7 +94,7 @@
notifyObservers();
}
- public void addQuery(FCPTransferQuery query) {
+ public synchronized void addQuery(FCPTransferQuery query) {
((Observable)query).addObserver(this);
queries.add(query);
@@ -101,7 +102,7 @@
notifyObservers();
}
- public void removeQuery(FCPTransferQuery query) {
+ public synchronized void removeQuery(FCPTransferQuery query) {
((Observable)query).deleteObserver(this);
queries.remove(query);
@@ -110,10 +111,29 @@
}
- public FCPTransferQuery getQuery(int row) {
- return (FCPTransferQuery)queries.get(row);
+ public synchronized FCPTransferQuery getQuery(int row) {
+ try {
+ return (FCPTransferQuery)queries.get(row);
+ } catch(java.lang.ArrayIndexOutOfBoundsException e) {
+ Logger.notice(this, "Query not found, row: "+row);
+ return null;
+ }
}
+ /**
+ * returns a *copy*
+ */
+ public synchronized Vector getQueries() {
+ Vector newVect = new Vector();
+
+ for(Iterator queryIt = queries.iterator() ;
+ queryIt.hasNext();) {
+ newVect.add(queryIt.next());
+ }
+
+ return newVect;
+ }
+
public void notifyObservers() {
TableModelListener[] listeners = getTableModelListeners();