Author: jflesch
Date: 2007-04-14 21:11:26 +0000 (Sat, 14 Apr 2007)
New Revision: 12729
Added:
trunk/apps/Thaw/src/thaw/fcp/FCPTestDDA.java
Modified:
trunk/apps/Thaw/src/thaw/core/Logger.java
trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java
trunk/apps/Thaw/src/thaw/fcp/FCPQuery.java
Log:
Implement TestDDA for the ClientGet request (need testing !)
Modified: trunk/apps/Thaw/src/thaw/core/Logger.java
===================================================================
--- trunk/apps/Thaw/src/thaw/core/Logger.java 2007-04-14 21:03:34 UTC (rev
12728)
+++ trunk/apps/Thaw/src/thaw/core/Logger.java 2007-04-14 21:11:26 UTC (rev
12729)
@@ -22,7 +22,7 @@
* 2 or more is recommanded.
* 4 or more is unhealthy
*/
- public final static int LOG_LEVEL = 2;
+ public final static int LOG_LEVEL = 3;
private static Vector logListeners = null;
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java 2007-04-14 21:03:34 UTC
(rev 12728)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java 2007-04-14 21:11:26 UTC
(rev 12729)
@@ -46,7 +46,9 @@
private boolean noDDA = false;
+ private FCPTestDDA testDDA = null;
+
/**
* See setParameters().
*/
@@ -200,6 +202,23 @@
this.queueManager = queueManager;
+ if
(queueManager.getQueryManager().getConnection().isLocalSocket()
+ && !noDDA
+ && (destinationDir != null || finalPath != null)) {
+
+ if (destinationDir == null)
+ destinationDir = new
File(finalPath).getAbsoluteFile().getParent();
+
+ testDDA = new FCPTestDDA(destinationDir, false, true);
+ testDDA.addObserver(this);
+ return testDDA.start(queueManager);
+ }
+
+ return sendClientGet();
+ }
+
+ public boolean sendClientGet() {
+
if (finalPath == null && destinationDir == null) {
if ((destinationDir =
System.getProperty("java.io.tmpdir")) == null) {
Logger.error(this, "Unable to find temporary
directory ! Will create troubles !");
@@ -262,7 +281,16 @@
public void update(final Observable o, final Object arg) {
+ if (o == testDDA) {
+ if (!testDDA.mayTheNodeWrite())
+ noDDA = true;
+ sendClientGet();
+
+ return;
+ }
+
+
FCPQueryManager queryManager = null;
final FCPMessage message = (FCPMessage)arg;
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java 2007-04-14 21:03:34 UTC
(rev 12728)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPMessage.java 2007-04-14 21:11:26 UTC
(rev 12729)
@@ -63,7 +63,7 @@
}
if("ProtocolError".equals( getMessageName() )) {
- Logger.notice(this, "PROTOCOL ERROR:"+toString());
+ Logger.notice(this, "PROTOCOL ERROR:\n"+toString());
}
return true;
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPQuery.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPQuery.java 2007-04-14 21:03:34 UTC (rev
12728)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPQuery.java 2007-04-14 21:11:26 UTC (rev
12729)
@@ -20,7 +20,7 @@
/**
* Tell if the query is a download query or an upload query.
* If >= 1 then *must* be Observable and implements FCPTransferQuery.
- * @return 0 : Meaningless ; 1 : Download ; 2 : Upload ; >= 2 : ?
+ * @return 0 : Meaningless ; 1 : Download ; 2 : Upload
*/
public int getQueryType();
Added: trunk/apps/Thaw/src/thaw/fcp/FCPTestDDA.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPTestDDA.java
(rev 0)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPTestDDA.java 2007-04-14 21:11:26 UTC
(rev 12729)
@@ -0,0 +1,206 @@
+package thaw.fcp;
+
+import java.util.Observable;
+import java.util.Observer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+
+import thaw.core.Logger;
+
+
+/**
+ * FCPClientGet do it automagically when using DDA, so you shouldn't have to
bother about it
+ */
+public class FCPTestDDA extends Observable implements FCPQuery, Observer {
+ private String dir;
+ private boolean wantRead;
+ private boolean wantWrite;
+
+ private String testFile;
+
+ private boolean nodeCanRead;
+ private boolean nodeCanWrite;
+
+ private FCPQueueManager queueManager;
+
+
+ public FCPTestDDA(String directory,
+ boolean wantTheNodeToRead,
+ boolean wantTheNodeToWrite) {
+ this.dir = directory;
+ this.wantRead = wantTheNodeToRead;
+ this.wantWrite = wantTheNodeToWrite;
+ }
+
+
+ public boolean start(FCPQueueManager queueManager) {
+ this.queueManager = queueManager;
+
+ FCPMessage msg = new FCPMessage();
+ msg.setMessageName("TestDDARequest");
+ msg.setValue("Directory", dir);
+ msg.setValue("WantReadDirectory", Boolean.toString(wantRead));
+ msg.setValue("WantWriteDirectory", Boolean.toString(wantWrite));
+
+ queueManager.getQueryManager().addObserver(this);
+
+ return queueManager.getQueryManager().writeMessage(msg);
+ }
+
+ public boolean stop(FCPQueueManager queueManager) {
+ /* Red Hot Chili Peppers - Can't stop */
+ return false;
+ }
+
+
+ protected boolean writeFile(String filename, String content) {
+ try {
+ FileOutputStream stream = new
FileOutputStream(filename, false);
+
+ stream.write(content.getBytes());
+ stream.close();
+
+ return true;
+
+ } catch(java.io.FileNotFoundException e) {
+
+ Logger.warning(this, "Unable to write file:
"+e.toString());
+ return false;
+
+ } catch(java.io.IOException e) {
+
+ Logger.warning(this, "Unable to write file:
"+e.toString());
+ return false;
+ }
+ }
+
+
+
+ protected String readFile(String filename) {
+ byte[] raw = new byte[128];
+
+ String data = null;
+
+ try {
+ FileInputStream stream = new FileInputStream(filename);
+
+ data = "";
+
+ while(stream.available() > 0) {
+ stream.read(raw);
+ data += new String(raw);
+ }
+
+ stream.close();
+
+ } catch(java.io.FileNotFoundException e) {
+ Logger.warning(this, "Unable to read file :
"+e.toString());
+ return null;
+ } catch(java.io.IOException e) {
+ Logger.warning(this, "Unable to read file :
"+e.toString());
+ return null;
+ }
+
+ return data;
+ }
+
+
+ protected boolean deleteFile(String filename) {
+ return (new File(filename)).delete();
+ }
+
+
+ public void update(Observable o, Object param) {
+ if (param == null || !(param instanceof FCPMessage))
+ return;
+
+ FCPMessage msg = (FCPMessage)param;
+
+ if ("ProtocolError".equals(msg.getMessageName())) {
+ if ("7".equals(msg.getValue("Code"))) {
+ Logger.warning(this, "Node doesn't support
TestDDA (-> ProtocolError) => DDA desactivated");
+
+
queueManager.getQueryManager().getConnection().setLocalSocket(false);
+
+ nodeCanRead = false;
+ nodeCanWrite = false;
+ setChanged();
+ notifyObservers();
+
+ return;
+ }
+ }
+
+
+ if (!dir.equals(msg.getValue("Directory"))) {
+ /* not for us */
+ return;
+ }
+
+
+ if ("TestDDAReply".equals(msg.getMessageName())) {
+ FCPMessage answer = new FCPMessage();
+ answer.setMessageName("TestDDAResponse");
+
+ answer.setValue("Directory", dir);
+
+ if (wantWrite) {
+ testFile = msg.getValue("WriteFilename");
+ writeFile(testFile,
msg.getValue("ContentToWrite"));
+ }
+
+ if (wantRead) {
+ String data =
readFile(msg.getValue("ReadFilename"));
+
+ if (data == null) {
+ Logger.error(this, "Thaw can't read the
file written by the node !");
+ }
+
+ answer.setValue("ReadContent", data != null ?
data : "bleh");
+ }
+
+ queueManager.getQueryManager().writeMessage(answer);
+ }
+
+
+
+
+ if ("TestDDAComplete".equals(msg.getMessageName())) {
+ nodeCanRead = false;
+ nodeCanWrite = false;
+
+ if (wantRead)
+ nodeCanRead =
Boolean.valueOf(msg.getValue("ReadDirectoryAllowed")).booleanValue();
+ if (wantWrite)
+ nodeCanWrite =
Boolean.valueOf(msg.getValue("WriteDirectoryAllowed")).booleanValue();
+
+ Logger.info(this,
+ "TestDDA : R : " +Boolean.toString(wantRead)
+ + " ; W : "+Boolean.toString(wantWrite));
+
+ if (wantWrite)
+ deleteFile(testFile);
+
+ queueManager.getQueryManager().deleteObserver(this);
+
+ setChanged();
+ notifyObservers();
+ }
+ }
+
+
+ public boolean mayTheNodeRead() {
+ return nodeCanRead;
+ }
+
+ public boolean mayTheNodeWrite() {
+ return nodeCanWrite;
+ }
+
+
+ public int getQueryType() {
+ return 0;
+ }
+}