Author: michiel
Date: 2010-07-09 15:31:00 +0200 (Fri, 09 Jul 2010)
New Revision: 42844

Added:
   mmbase/trunk/applications/clustering/converter.sh
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
Modified:
   mmbase/trunk/applications/clustering/pom.xml
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
   
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
Log:
MMB-1972, MMB-1971

Added: mmbase/trunk/applications/clustering/converter.sh
===================================================================
--- mmbase/trunk/applications/clustering/converter.sh                           
(rev 0)
+++ mmbase/trunk/applications/clustering/converter.sh   2010-07-09 13:31:00 UTC 
(rev 42844)
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+export 
CLASSPATH=~/.m2/repository/org/mmbase/mmbase-utils/2.0-SNAPSHOT/mmbase-utils-2.0-SNAPSHOT.jar:~/.m2/repository/org/mmbase/mmbase-clustering/2.0-SNAPSHOT/mmbase-clustering-2.0-SNAPSHOT-classes.jar
+
+echo ${CLASSPATH}
+
+java org.mmbase.clustering.Converter $@
\ No newline at end of file


Property changes on: mmbase/trunk/applications/clustering/converter.sh
___________________________________________________________________
Name: svn:executable
   + *

Modified: mmbase/trunk/applications/clustering/pom.xml
===================================================================
--- mmbase/trunk/applications/clustering/pom.xml        2010-07-09 13:29:04 UTC 
(rev 42843)
+++ mmbase/trunk/applications/clustering/pom.xml        2010-07-09 13:31:00 UTC 
(rev 42844)
@@ -22,7 +22,12 @@
     for communication between machines, including one which depends on the 
'jgroups' software.
   </description>
 
+  <properties>
+    <jar.mainClass>org.mmbase.clustering.Converter</jar.mainClass>
+  </properties>
 
+
+
   <dependencies>
     <!-- TODO: Remove dependency on core -->
     <dependency>

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
        2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java
        2010-07-09 13:31:00 UTC (rev 42844)
@@ -38,10 +38,14 @@
     protected final Statistics receive = new Statistics();
     protected final Statistics send    = new Statistics();
 
-    /** Queue with messages to send to other MMBase instances */
-    protected BlockingQueue<byte[]> nodesToSend = new 
LinkedBlockingQueue<byte[]>(64);
-    /** Queue with received messages from other MMBase instances */
-    protected BlockingQueue<byte[]> nodesToSpawn = new 
LinkedBlockingQueue<byte[]>(64);
+    /**
+     * Queue with messages to send to other MMBase instances
+     */
+    protected final BlockingQueue<byte[]> nodesToSend = new 
LinkedBlockingQueue<byte[]>(64);
+    /**
+     * Queue with received messages from other MMBase instances
+     */
+    protected final BlockingQueue<byte[]> nodesToSpawn = new 
LinkedBlockingQueue<byte[]>(64);
 
     /** Thread which processes the messages */
     protected Thread kicker = null;

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
      2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java
      2010-07-09 13:31:00 UTC (rev 42844)
@@ -42,7 +42,7 @@
         if(clusterManagerClassName != null){
             clusterManager = findInstance(clusterManagerClassName);
             EventManager.getInstance().addEventListener(clusterManager);
-        }else{
+        } else {
             log.error("Parameter 'ClusterManagerImplementation' is missing 
from config file. can not load clustering");
         }
 

Added: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
                             (rev 0)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java
     2010-07-09 13:31:00 UTC (rev 42844)
@@ -0,0 +1,102 @@
+package org.mmbase.clustering;
+import java.util.*;
+import java.net.*;
+import java.util.concurrent.*;
+/**
+ * Main class of this class starts up a unicast sender and listener
+ * and multicast sender and listener and connects those, effectively
+ * allowing for one 'out lyer' server which via this small little
+ * program connected to the local multicast network.
+ * @author Michiel Meeuwissen
+ */
+
+public class Converter {
+
+
+    public static void main(String[] argv) throws Exception {
+
+
+        
org.mmbase.util.logging.SimpleTimeStampImpl.configure("org.mmbase.clustering", 
"stdout,debug");
+
+        Map<String, String> argMap = new HashMap<String, String>();
+        argMap.put("unicastListen", InetAddress.getLocalHost().getHostName() + 
":4123");
+        argMap.put("unicastSend", "otherhost:4123:mmbase");
+
+        argMap.put("multicast", 
org.mmbase.clustering.multicast.Multicast.HOST_DEFAULT + ":" + 
org.mmbase.clustering.multicast.Multicast.PORT_DEFAULT);
+        for (String arg : argv) {
+            String[] split = arg.split("=", 2);
+            if (split.length == 2) {
+                if (argMap.containsKey(split[0])) {
+                    argMap.put(split[0], split[1]);
+                } else {
+                    System.err.println("Unrecognized option " + arg + " 
Options are " + argMap);
+                    System.exit(1);
+                }
+            } else {
+                System.err.println("Unrecognized option " + arg + " Options 
are " + argMap);
+                System.exit(2);
+            }
+        }
+
+
+        final BlockingQueue<byte[]> uniToMultiNodes =  new 
LinkedBlockingQueue<byte[]>(64);
+        final BlockingQueue<byte[]> multiToUniNodes =  new 
LinkedBlockingQueue<byte[]>(64);
+
+        String[] unicast = argMap.get("unicastListen").split(":");
+        final String unicastListenHost = unicast[0];
+        final int unicastListenPort    = Integer.parseInt(unicast[1]);
+        final int unicastListenVersion = 2;
+
+
+        final List<org.mmbase.clustering.unicast.ChangesSender.OtherMachine> 
unicastSenders
+            = new 
ArrayList<org.mmbase.clustering.unicast.ChangesSender.OtherMachine>();
+        {
+            String[] unicastHost = argMap.get("unicastSend").split(",");
+            for (String unicastString : unicastHost) {
+                if (unicastString.length() > 0) {
+                    String[] unicastSend = unicastString.split(":", 3);
+                    unicastSenders.add(new 
org.mmbase.clustering.unicast.ChangesSender.OtherMachine(unicastSend[0], 
unicastSend.length > 2 ? unicastSend[2] : null, 
Integer.parseInt(unicastSend[1]), 2));
+                }
+            }
+        }
+
+
+        int dpsize = 64 * 1024;
+        String[] multicast = argMap.get("multicast").split(":");
+        final String multicastHost = multicast[0];
+        final int multicastPort    = Integer.parseInt(multicast[1]);
+        int multicastTimeToLive = 1;
+
+        Statistics stats = new Statistics();
+
+        Runnable uniCastReceiver   = new 
org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, 
unicastListenPort, uniToMultiNodes, 2);
+        Runnable uniCastSender     = new 
org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000, 
multiToUniNodes, stats, 2) {
+                @Override
+                protected Iterable<OtherMachine> getOtherMachines() {
+                    return unicastSenders;
+                }
+                @Override
+                protected int remove(OtherMachine mach) {
+                    return 0;
+                }
+            };
+
+
+        Runnable multiCastReceiver   = new 
org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, multicastPort, 
dpsize, multiToUniNodes);
+        org.mmbase.clustering.multicast.ChangesSender multiCastSender
+            = new org.mmbase.clustering.multicast.ChangesSender(multicastHost, 
multicastPort, multicastTimeToLive, uniToMultiNodes, stats);
+        multiCastSender.getSocket().setLoopbackMode(true);
+
+        synchronized(Converter.class) {
+
+
+            System.out.println("Waiting for interrupt");
+            try {
+                Converter.class.wait();
+                System.out.println("INTERRUPTED");
+            } catch (InterruptedException ie) {
+                System.out.println(ie.getMessage());
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
     2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java
     2010-07-09 13:31:00 UTC (rev 42844)
@@ -10,9 +10,9 @@
 package org.mmbase.clustering.multicast;
 
 import java.net.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
 
-import org.mmbase.module.core.MMBaseContext;
+import org.mmbase.util.*;
 import org.mmbase.util.logging.Logger;
 import org.mmbase.util.logging.Logging;
 
@@ -29,8 +29,7 @@
 
     private static final Logger log = 
Logging.getLoggerInstance(ChangesReceiver.class);
 
-    /** Thread which sends the messages */
-    private Thread kicker = null;
+    private Future future = null;
 
     /** Queue with messages received from other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSpawn;
@@ -55,7 +54,7 @@
      * @param nodesToSpawn Queue of received messages
      * @throws UnknownHostException when multicastHost is not found
      */
-    ChangesReceiver(String multicastHost, int mport, int dpsize, 
BlockingQueue<byte[]> nodesToSpawn)  throws UnknownHostException {
+    public ChangesReceiver(String multicastHost, int mport, int dpsize, 
BlockingQueue<byte[]> nodesToSpawn)  throws UnknownHostException {
         this.mport = mport;
         this.dpsize = dpsize;
         this.nodesToSpawn = nodesToSpawn;
@@ -63,8 +62,8 @@
         this.start();
     }
 
-    private  void start() {
-        if (kicker == null && ia != null) {
+    void start() {
+        if (future == null && ia != null) {
             try {
                 ms = new MulticastSocket(mport);
                 ms.joinGroup(ia);
@@ -72,7 +71,8 @@
                 log.error(e.getMessage(), e);
             }
             if (ms != null) {
-                kicker = MMBaseContext.startThread(this, "MulticastReceiver");
+                future = ThreadPools.jobsExecutor.submit(this);
+                ThreadPools.identify(future, "MulticastReceiver");
                 log.debug("MulticastReceiver started");
             }
         }
@@ -87,15 +87,20 @@
         } catch (Exception e) {
             // nothing
         }
-        if (kicker != null) {
-            kicker.interrupt();
-            kicker = null;
+        if (future != null) {
+            try {
+                future.cancel(true);
+                future = null;
+            } catch (Throwable t) {
+            }
         } else {
             log.service("Cannot stop thread, because it is null");
         }
     }
 
+
     public void run() {
+        log.info("MultiCast receiving on " + ms +  " " + ia + ":" + mport);
         // create a datapackage to receive all messages
         byte[] buffer = new byte[dpsize];
         DatagramPacket dp = new DatagramPacket(buffer, dpsize);
@@ -110,10 +115,10 @@
                 // That's not what we want. Especially when falling back to 
legacy, this is translated to a String.
                 // which otherwise gets dpsize length (64k!)
                 System.arraycopy(dp.getData(), 0, message, 0, dp.getLength());
+                nodesToSpawn.offer(message);
                 if (log.isDebugEnabled()) {
-                    log.debug("RECEIVED=> " + dp.getLength() + " bytes from " 
+ dp.getAddress());
+                    log.debug("Multicast RECEIVED=> " + dp.getLength() + " 
bytes from " + dp.getAddress() + " queue " + nodesToSpawn.size());
                 }
-                nodesToSpawn.offer(message);
             } catch (java.net.SocketException se) {
                 // generally happens on shutdown (ms==null)
                 // if not log it as an error

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
       2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java
       2010-07-09 13:31:00 UTC (rev 42844)
@@ -13,9 +13,9 @@
 
 import java.net.*;
 import java.io.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
 
-import org.mmbase.module.core.MMBaseContext;
+import org.mmbase.util.ThreadPools;
 import org.mmbase.util.logging.Logger;
 import org.mmbase.util.logging.Logging;
 
@@ -34,8 +34,7 @@
 
     private final Statistics send;
 
-    /** Thread which sends the messages */
-    private Thread kicker = null;
+    private Future future = null;
 
     /** Queue with messages to send to other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSend;
@@ -60,7 +59,7 @@
      * @param send Statistics object in which to administer duration costs
      * @throws UnknownHostException when multicastHost is not found
      */
-    ChangesSender(String multicastHost, int mport, int mTTL, 
BlockingQueue<byte[]> nodesToSend, Statistics send) throws UnknownHostException 
 {
+    public ChangesSender(String multicastHost, int mport, int mTTL, 
BlockingQueue<byte[]> nodesToSend, Statistics send) throws UnknownHostException 
 {
         this.mport = mport;
         this.mTTL = mTTL;
         this.nodesToSend = nodesToSend;
@@ -69,17 +68,18 @@
         this.start();
     }
 
-    private void start() {
-        if (kicker == null && ia != null) {
+    void start() {
+        if (future == null && ia != null) {
             try {
                 ms = new MulticastSocket();
                 ms.joinGroup(ia);
                 ms.setTimeToLive(mTTL);
             } catch(Exception e) {
-                log.error(Logging.stackTrace(e));
+                log.error(e.getMessage(), e);
             }
 
-            kicker = MMBaseContext.startThread(this, "MulticastSender");
+            future = ThreadPools.jobsExecutor.submit(this);
+            ThreadPools.identify(future, "MulticastSender");
             log.debug("MulticastSender started");
         }
     }
@@ -92,17 +92,22 @@
             // nothing
         }
         ms = null;
-        if (kicker != null) {
-            kicker.interrupt();
-            kicker.setPriority(Thread.MIN_PRIORITY);
-            kicker = null;
+        if (future != null) {
+            try {
+                future.cancel(true);
+            } catch (Throwable t) {
+            }
+            future = null;
         } else {
             log.service("Cannot stop thread, because it is null");
         }
     }
+    public MulticastSocket getSocket() {
+        return ms;
+    }
 
     public void run() {
-        log.debug("Started sending");
+        log.info("MultiCast sending on " + ms + " " + ia + ":" + mport);
         while(ms != null) {
             try {
                 byte[] data = nodesToSend.take();

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
   2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java
   2010-07-09 13:31:00 UTC (rev 42844)
@@ -33,17 +33,20 @@
 
     public static final String CONFIG_FILE = "multicast.xml";
 
+    public static final String HOST_DEFAULT = "ALL-SYSTEMS.MCAST.NET";
+    public static final int    PORT_DEFAULT = 16080;
+
     /**
      * Defines what 'channel' we are talking to when using multicast.
      */
-    private String multicastHost = "ALL-SYSTEMS.MCAST.NET";
+    private String multicastHost = HOST_DEFAULT;
 
     /**
      * Determines on what port does this multicast talking between nodes take 
place.
      * This can be set to any port but check if something else on
      * your network is allready using multicast when you have problems.
      */
-    private int multicastPort = 4243;
+    private int multicastPort = PORT_DEFAULT;
 
     /** Determines the Time To Live for a multicast datapacket */
     private int multicastTTL = 1;
@@ -123,11 +126,13 @@
         } else {
             try {
                 mcs = new ChangesSender(multicastHost, multicastPort, 
multicastTTL, nodesToSend, send);
+                mcs.start();
             } catch (java.net.UnknownHostException e) {
                 log.error(e);
             }
             try {
                 mcr = new ChangesReceiver(multicastHost, multicastPort, 
dpsize, nodesToSpawn);
+                mcr.start();
             } catch (java.net.UnknownHostException e) {
                 log.error(e);
             }

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
       2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java
       2010-07-09 13:31:00 UTC (rev 42844)
@@ -11,9 +11,10 @@
 
 import java.io.*;
 import java.net.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
 
-import org.mmbase.core.util.DaemonThread;
+
+import org.mmbase.util.ThreadPools;
 import org.mmbase.util.logging.Logger;
 import org.mmbase.util.logging.Logging;
 
@@ -29,14 +30,15 @@
 
     private static final Logger log = 
Logging.getLoggerInstance(ChangesReceiver.class);
 
-    /** Thread which sends the messages */
-    private Thread kicker = null;
+    private Future future = null;
 
     /** Queue with messages received from other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSpawn;
 
     private final ServerSocket serverSocket;
 
+    private final int version;
+
     /**
      * Construct UniCast Receiver
      * @param unicastHost host of unicast connection
@@ -44,29 +46,27 @@
      * @param nodesToSpawn Queue of received messages
      * @throws IOException when server socket failrf to listen
      */
-    ChangesReceiver(String unicastHost, int unicastPort, BlockingQueue<byte[]> 
nodesToSpawn) throws IOException {
+    public ChangesReceiver(String unicastHost, int unicastPort, 
BlockingQueue<byte[]> nodesToSpawn, int version) throws IOException {
         this.nodesToSpawn = nodesToSpawn;
         this.serverSocket = new ServerSocket();
         SocketAddress address = new InetSocketAddress(unicastHost, 
unicastPort);
         serverSocket.bind(address);
-        log.info("Listening to " + address);
+        this.version = version;
         this.start();
     }
 
     private void start() {
-        if (kicker == null) {
-            kicker = new DaemonThread(this, "UnicastReceiver");
-            kicker.start();
-            log.debug("UnicastReceiver started");
+        if (future == null) {
+            future = ThreadPools.jobsExecutor.submit(this);
+            ThreadPools.identify(future, "UnicastReceiver");
         }
     }
 
     void stop() {
-        if (kicker != null) {
+        if (future != null) {
             try {
-                kicker.interrupt();
-                kicker.setPriority(Thread.MIN_PRIORITY);
-                kicker = null;
+                future.cancel(true);
+                future = null;
             } catch (Throwable t) {
             }
             try {
@@ -80,35 +80,62 @@
     }
 
     public void run() {
+        log.info("Unicast listening started on " + serverSocket + " (v:" + 
version + ")");
         try {
-            while (kicker!=null) {
+            while (true) {
+                if (Thread.currentThread().isInterrupted()) break;
                 Socket socket = null;
-                InputStream reader = null;
+                DataInputStream reader = null;
                 try {
                     socket = serverSocket.accept();
-                    reader = new BufferedInputStream(socket.getInputStream());
-                    ByteArrayOutputStream writer = new ByteArrayOutputStream();
-                    int size = 0;
-                    //this buffer has nothing to do with the OS buffer
-                    byte[] buffer = new byte[1024];
+                    log.debug("" + socket);
 
-                    while ((size = reader.read(buffer)) != -1) {
-                        if (writer != null) {
-                          writer.write(buffer, 0, size);
-                          writer.flush();
-                       }
+                    reader = new DataInputStream(socket.getInputStream());
+
+                    if (version > 1) {
+                        int listSize = reader.readInt();
+                        log.debug("Will read " + listSize + " events");
+
+                        for (int i = 0; i < listSize; i++) {
+                            int arraySize = reader.readInt();
+                            log.debug("Size of event " + i + ": " + arraySize);
+                            ByteArrayOutputStream writer = new 
ByteArrayOutputStream();
+                            //this buffer has nothing to do with the OS buffer
+                            byte[] buffer = new byte[arraySize];
+                            reader.read(buffer);
+                            if (writer != null) {
+                                writer.write(buffer, 0, arraySize);
+                                writer.flush();
+                            }
+                            // maybe we should use encoding here?
+                            byte[] message = writer.toByteArray();
+                            if (log.isDebugEnabled()) {
+                                log.debug("unicase RECEIVED=>" + message);
+                            }
+                            nodesToSpawn.offer(message);
+                        }
+                    } else {
+                        ByteArrayOutputStream writer = new 
ByteArrayOutputStream();
+                        int size = 0;
+                        //this buffer has nothing to do with the OS buffer
+                        byte[] buffer = new byte[1024];
+                        while ((size = reader.read(buffer)) != -1) {
+                            if (writer != null) {
+                                writer.write(buffer, 0, size);
+                                writer.flush();
+                            }
+                        }
+                        byte[] message = writer.toByteArray();
+                        if (log.isDebugEnabled()) {
+                            log.debug("RECEIVED=>" + message);
+                        }
+                        nodesToSpawn.offer(message);
                     }
-                    // maybe we should use encoding here?
-                    byte[] message = writer.toByteArray();
-                    if (log.isDebugEnabled()) {
-                        log.debug("RECEIVED=>" + message);
-                    }
-                    nodesToSpawn.offer(message);
                 } catch (SocketException e) {
                     log.warn(e);
                     continue;
                 } catch (Exception e) {
-                    log.error(e);
+                    log.error(e.getMessage(), e);
                 } finally {
                     if (reader != null) {
                         try {

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
 2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java
 2010-07-09 13:31:00 UTC (rev 42844)
@@ -14,9 +14,9 @@
 import java.io.IOException;
 import java.net.*;
 import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
 
-import org.mmbase.core.util.DaemonThread;
+import org.mmbase.util.ThreadPools;
 import org.mmbase.module.builders.MMServers;
 import org.mmbase.module.core.*;
 
@@ -28,6 +28,7 @@
  * sending queue over unicast connections
  *
  * @author Nico Klasens
+ * @author Michiel Meeuwissen
  * @version $Id$
  */
 public class ChangesSender implements Runnable {
@@ -36,8 +37,7 @@
 
     private final Statistics send;
 
-    /** Thread which sends the messages */
-    private Thread kicker = null;
+    private Future future = null;
 
     /** Queue with messages to send to other MMBase instances */
     private final BlockingQueue<byte[]> nodesToSend;
@@ -56,6 +56,8 @@
     /** Interval of servers change their state */
     private long serverInterval;
 
+    private int version = 1;
+
     /**
      * Construct UniCast Sender
      * @param configuration configuration of unicast
@@ -64,87 +66,125 @@
      * @param nodesToSend Queue of messages to send
      * @param send Statistics
      */
-    ChangesSender(Map<String,String> configuration, int unicastPort, int 
unicastTimeout, BlockingQueue<byte[]> nodesToSend, Statistics send) {
+    public ChangesSender(Map<String,String> configuration, int unicastPort, 
int unicastTimeout, BlockingQueue<byte[]> nodesToSend, Statistics send, int 
version) {
         this.nodesToSend = nodesToSend;
         this.configuration = configuration;
         this.defaultUnicastPort = unicastPort;
         this.unicastTimeout = unicastTimeout;
         this.send = send;
+        this.version = version;
         this.start();
     }
 
-    private  void start() {
-        if (kicker == null) {
-            kicker = new DaemonThread(this, "UnicastSender");
-            kicker.start();
-            log.debug("UnicastSender started");
+    void start() {
+        if (future == null) {
+            future = ThreadPools.jobsExecutor.submit(this);
+            ThreadPools.identify(future, "UnicastSender");
         }
     }
 
     void stop() {
-        if (kicker != null) {
-            kicker.interrupt();
-            kicker.setPriority(Thread.MIN_PRIORITY);
-            kicker = null;
+        if (future != null) {
+            try {
+                future.cancel(true);
+                future = null;
+            } catch (Throwable t) {
+            }
         } else {
             log.service("Cannot stop thread, because it is null");
         }
     }
 
+
+    public static class OtherMachine {
+        public final String host;
+        public final String machineName;
+        public final int unicastPort;
+        public final int version;
+        public OtherMachine(String host, String machineName, int unicastPort, 
int version) {
+            this.host = host;
+            this.machineName = machineName;
+            this.unicastPort = unicastPort;
+            this.version = version;
+
+        }
+        @Override
+        public String toString() {
+            return host + ":" + unicastPort + " (v:" + version + ")";
+        }
+    }
+
     // javadoc inherited
     public void run() {
-        while(kicker != null) {
+        log.info("Unicast sending to " + getOtherMachines());
+        while(true) {
+            if (Thread.currentThread().isInterrupted()) break;
             try {
-                byte[] data = nodesToSend.take();
+                List<byte[]> data = new ArrayList<byte[]>();
+                data.add(nodesToSend.take()); // at least one
+                if (version > 1) {
+                    nodesToSend.drainTo(data);
+                }
                 long startTime = System.currentTimeMillis();
-                List<MMObjectNode> servers = getActiveServers();
-                for (int i = 0; i < servers.size(); i++) {
-                    MMObjectNode node = servers.get(i);
-                    if (node != null) {
-                        String hostname = node.getStringValue("host");
-                        String machinename = node.getStringValue("name");
-
-                        int unicastPort = defaultUnicastPort;
-                        String specificPort = configuration.get(machinename + 
".unicastport");
-                        if (specificPort != null) {
-                            unicastPort = Integer.parseInt(specificPort);
-                        }
-                        Socket socket = null;
-                        DataOutputStream os = null;
-                        try {
+                log.debug("Send change to " + getOtherMachines());
+                for (OtherMachine machine : getOtherMachines()) {
+                    DataOutputStream os = null;
+                    Socket socket = null;
+                    try {
+                        if (machine.version > 1) {
                             socket = new Socket();
-                            socket.connect(new InetSocketAddress(hostname, 
unicastPort), unicastTimeout);
+                            socket.connect(new InetSocketAddress(machine.host, 
machine.unicastPort), unicastTimeout);
                             os = new 
DataOutputStream(socket.getOutputStream());
-                            os.write(data, 0, data.length);
+                            os.writeInt(data.size());
+                            send.bytes += 4;
+                            for (byte[] d : data) {
+                                os.writeInt(d.length);
+                                send.bytes += 4;
+                                os.write(d, 0, d.length);
+                                send.bytes += d.length;
+                            }
                             os.flush();
-                            if (log.isDebugEnabled()) {
-                                log.debug("SEND=>" + hostname + ":" + 
unicastPort);
+                        } else {
+                            for (byte[] d : data) {
+                                socket = new Socket();
+                                socket.connect(new 
InetSocketAddress(machine.host, machine.unicastPort), unicastTimeout);
+                                os = new 
DataOutputStream(socket.getOutputStream());
+                                os.write(d, 0, d.length);
+                                send.bytes += d.length;
+                                os.flush();
                             }
-                        } catch(SocketTimeoutException ste) {
-                            servers.remove(i);
-                            log.warn("Server timeout: " + hostname + ":" + 
unicastPort + " " + ste + ". Removed " + node + " from active server list.");
-                        } catch (ConnectException ce) {
-                            log.warn("Connect exception: " + hostname + ":" + 
unicastPort + " " + ce + ".");
-                        } catch (IOException e) {
-                            log.error("can't send message to " + hostname + 
":" + unicastPort + " " + e.getMessage() , e);
-                        } finally {
-                            if (os != null) {
-                                try {
-                                    os.close();
-                                } catch (IOException e1) {
-                                }
+                        }
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("SEND=>" + machine + " (" + data.size() 
+ " events)");
+                        }
+                    } catch(SocketTimeoutException ste) {
+                        int removed = remove(machine);
+                        if (removed == 1) {
+                            log.warn("Server timeout: " + machine + " " + ste 
+ ". Removed from active server list.");
+                        } else {
+                            log.error("Server timeout: " + machine + " " + ste 
+ ". Remove from active server list: " + removed);
+                        }
+                    } catch (ConnectException ce) {
+                        log.warn("Connect exception: " + machine + " " + ce + 
".");
+                    } catch (IOException e) {
+                        log.error("can't send message to " + machine + " " + 
e.getMessage() , e);
+                    } finally {
+                        if (os != null) {
+                            try {
+                                os.close();
+                            } catch (IOException e1) {
                             }
-                            if (socket != null) {
-                                try {
-                                    socket.close();
-                                } catch (IOException e1) {
-                                }
+                        }
+                        if (socket != null) {
+                            try {
+                                socket.close();
+                            } catch (IOException e1) {
                             }
                         }
                     }
                 }
                 send.count++;
-                send.bytes += data.length;
                 send.cost += (System.currentTimeMillis() - startTime);
 
             } catch (InterruptedException e) {
@@ -185,4 +225,44 @@
         return activeServers;
     }
 
+    protected int remove(OtherMachine remove) {
+        Iterator<MMObjectNode> i = activeServers.iterator();
+        while (i.hasNext()) {
+            MMObjectNode node = i.next();
+            String hostname    = node.getStringValue("host");
+            String machinename = node.getStringValue("name");
+            if (remove.host.equals(hostname) && 
remove.machineName.equals(machinename)) {
+                i.remove();
+                return 1;
+            }
+        }
+        return 0;
+    }
+
+
+    protected Iterable<OtherMachine> getOtherMachines() {
+        List<OtherMachine> result = new ArrayList<OtherMachine>();
+
+        for (MMObjectNode node : getActiveServers()) {
+            if (node != null) {
+                String hostname    = node.getStringValue("host");
+                String machinename = node.getStringValue("name");
+
+                int unicastPort = defaultUnicastPort;
+                int version = 1;
+                if (configuration != null) {
+                    String specificPort = configuration.get(machinename + 
".unicastport");
+                    if (specificPort != null) {
+                        unicastPort = Integer.parseInt(specificPort);
+                    }
+                    String specificVersion = configuration.get(machinename + 
".version");
+                    if (specificVersion != null) {
+                        version = Integer.parseInt(specificVersion);
+                    }
+                }
+                result.add(new OtherMachine(hostname, machinename, 
unicastPort, version));
+            }
+        }
+        return result;
+    }
 }

Modified: 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
===================================================================
--- 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
       2010-07-09 13:29:04 UTC (rev 42843)
+++ 
mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java
       2010-07-09 13:31:00 UTC (rev 42844)
@@ -41,7 +41,9 @@
     /** Timeout of the connection.*/
     private int unicastTimeout = 10 * 1000;
 
+    private int version = 1;
 
+
     /** Sender which reads the nodesToSend Queue amd puts the message on the 
line */
     private ChangesSender ucs;
     /** Receiver which reads the message from the line and puts message in the 
nodesToSpawn Queue */
@@ -63,7 +65,7 @@
 
 
 
-    public Unicast(){
+    public Unicast() {
         readConfiguration(reader.getProperties());
         start();
     }
@@ -102,9 +104,26 @@
             } catch (Exception e) {}
         }
 
+        {
+
+            String tmpVersion = configuration.get("version");
+            if (tmpVersion != null && !tmpVersion.equals("")) {
+                try {
+                    version = Integer.parseInt(tmpVersion);
+                } catch (Exception e) {}
+            }
+            tmpVersion = 
configuration.get(org.mmbase.module.core.MMBase.getMMBase().getMachineName() + 
".version");
+            if (tmpVersion != null && !tmpVersion.equals("")) {
+                try {
+                    version = Integer.parseInt(tmpVersion);
+                } catch (Exception e) {}
+            }
+        }
+
         log.info("unicast host: "    + unicastHost);
         log.info("unicast port: "    + unicastPort);
         log.info("unicast timeout: " + unicastTimeout);
+        log.info("unicast version: " + version + " (" + (version > 1 ? 
"multiple messages" : "single message") + ")");
 
     }
 
@@ -115,9 +134,9 @@
         if (unicastPort == -1) {
             log.service("Not starting unicast threads because port number 
configured to be -1");
         } else {
-            ucs = new ChangesSender(reader.getProperties(), unicastPort, 
unicastTimeout, nodesToSend, send);
+            ucs = new ChangesSender(reader.getProperties(), unicastPort, 
unicastTimeout, nodesToSend, send, version);
             try {
-                ucr = new ChangesReceiver(unicastHost, unicastPort, 
nodesToSpawn);
+                ucr = new ChangesReceiver(unicastHost, unicastPort, 
nodesToSpawn, version);
             } catch (java.io.IOException ioe) {
                 log.error(ioe);
             }

_______________________________________________
Cvs mailing list
Cvs@lists.mmbase.org
http://lists.mmbase.org/mailman/listinfo/cvs

Reply via email to