Author: michiel Date: 2010-07-09 15:31:00 +0200 (Fri, 09 Jul 2010) New Revision: 42844
Added: mmbase/trunk/applications/clustering/converter.sh mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java Modified: mmbase/trunk/applications/clustering/pom.xml mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java Log: MMB-1972, MMB-1971 Added: mmbase/trunk/applications/clustering/converter.sh =================================================================== --- mmbase/trunk/applications/clustering/converter.sh (rev 0) +++ mmbase/trunk/applications/clustering/converter.sh 2010-07-09 13:31:00 UTC (rev 42844) @@ -0,0 +1,7 @@ +#!/bin/bash + +export CLASSPATH=~/.m2/repository/org/mmbase/mmbase-utils/2.0-SNAPSHOT/mmbase-utils-2.0-SNAPSHOT.jar:~/.m2/repository/org/mmbase/mmbase-clustering/2.0-SNAPSHOT/mmbase-clustering-2.0-SNAPSHOT-classes.jar + +echo ${CLASSPATH} + +java org.mmbase.clustering.Converter $@ \ No newline at end of file Property changes on: mmbase/trunk/applications/clustering/converter.sh ___________________________________________________________________ Name: svn:executable + * Modified: mmbase/trunk/applications/clustering/pom.xml =================================================================== --- mmbase/trunk/applications/clustering/pom.xml 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/pom.xml 2010-07-09 13:31:00 UTC (rev 42844) @@ -22,7 +22,12 @@ for communication between machines, including one which depends on the 'jgroups' software. </description> + <properties> + <jar.mainClass>org.mmbase.clustering.Converter</jar.mainClass> + </properties> + + <dependencies> <!-- TODO: Remove dependency on core --> <dependency> Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusterManager.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -38,10 +38,14 @@ protected final Statistics receive = new Statistics(); protected final Statistics send = new Statistics(); - /** Queue with messages to send to other MMBase instances */ - protected BlockingQueue<byte[]> nodesToSend = new LinkedBlockingQueue<byte[]>(64); - /** Queue with received messages from other MMBase instances */ - protected BlockingQueue<byte[]> nodesToSpawn = new LinkedBlockingQueue<byte[]>(64); + /** + * Queue with messages to send to other MMBase instances + */ + protected final BlockingQueue<byte[]> nodesToSend = new LinkedBlockingQueue<byte[]>(64); + /** + * Queue with received messages from other MMBase instances + */ + protected final BlockingQueue<byte[]> nodesToSpawn = new LinkedBlockingQueue<byte[]>(64); /** Thread which processes the messages */ protected Thread kicker = null; Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/ClusteringModule.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -42,7 +42,7 @@ if(clusterManagerClassName != null){ clusterManager = findInstance(clusterManagerClassName); EventManager.getInstance().addEventListener(clusterManager); - }else{ + } else { log.error("Parameter 'ClusterManagerImplementation' is missing from config file. can not load clustering"); } Added: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java (rev 0) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/Converter.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -0,0 +1,102 @@ +package org.mmbase.clustering; +import java.util.*; +import java.net.*; +import java.util.concurrent.*; +/** + * Main class of this class starts up a unicast sender and listener + * and multicast sender and listener and connects those, effectively + * allowing for one 'out lyer' server which via this small little + * program connected to the local multicast network. + * @author Michiel Meeuwissen + */ + +public class Converter { + + + public static void main(String[] argv) throws Exception { + + + org.mmbase.util.logging.SimpleTimeStampImpl.configure("org.mmbase.clustering", "stdout,debug"); + + Map<String, String> argMap = new HashMap<String, String>(); + argMap.put("unicastListen", InetAddress.getLocalHost().getHostName() + ":4123"); + argMap.put("unicastSend", "otherhost:4123:mmbase"); + + argMap.put("multicast", org.mmbase.clustering.multicast.Multicast.HOST_DEFAULT + ":" + org.mmbase.clustering.multicast.Multicast.PORT_DEFAULT); + for (String arg : argv) { + String[] split = arg.split("=", 2); + if (split.length == 2) { + if (argMap.containsKey(split[0])) { + argMap.put(split[0], split[1]); + } else { + System.err.println("Unrecognized option " + arg + " Options are " + argMap); + System.exit(1); + } + } else { + System.err.println("Unrecognized option " + arg + " Options are " + argMap); + System.exit(2); + } + } + + + final BlockingQueue<byte[]> uniToMultiNodes = new LinkedBlockingQueue<byte[]>(64); + final BlockingQueue<byte[]> multiToUniNodes = new LinkedBlockingQueue<byte[]>(64); + + String[] unicast = argMap.get("unicastListen").split(":"); + final String unicastListenHost = unicast[0]; + final int unicastListenPort = Integer.parseInt(unicast[1]); + final int unicastListenVersion = 2; + + + final List<org.mmbase.clustering.unicast.ChangesSender.OtherMachine> unicastSenders + = new ArrayList<org.mmbase.clustering.unicast.ChangesSender.OtherMachine>(); + { + String[] unicastHost = argMap.get("unicastSend").split(","); + for (String unicastString : unicastHost) { + if (unicastString.length() > 0) { + String[] unicastSend = unicastString.split(":", 3); + unicastSenders.add(new org.mmbase.clustering.unicast.ChangesSender.OtherMachine(unicastSend[0], unicastSend.length > 2 ? unicastSend[2] : null, Integer.parseInt(unicastSend[1]), 2)); + } + } + } + + + int dpsize = 64 * 1024; + String[] multicast = argMap.get("multicast").split(":"); + final String multicastHost = multicast[0]; + final int multicastPort = Integer.parseInt(multicast[1]); + int multicastTimeToLive = 1; + + Statistics stats = new Statistics(); + + Runnable uniCastReceiver = new org.mmbase.clustering.unicast.ChangesReceiver(unicastListenHost, unicastListenPort, uniToMultiNodes, 2); + Runnable uniCastSender = new org.mmbase.clustering.unicast.ChangesSender(null, 4123, 10 * 1000, multiToUniNodes, stats, 2) { + @Override + protected Iterable<OtherMachine> getOtherMachines() { + return unicastSenders; + } + @Override + protected int remove(OtherMachine mach) { + return 0; + } + }; + + + Runnable multiCastReceiver = new org.mmbase.clustering.multicast.ChangesReceiver(multicastHost, multicastPort, dpsize, multiToUniNodes); + org.mmbase.clustering.multicast.ChangesSender multiCastSender + = new org.mmbase.clustering.multicast.ChangesSender(multicastHost, multicastPort, multicastTimeToLive, uniToMultiNodes, stats); + multiCastSender.getSocket().setLoopbackMode(true); + + synchronized(Converter.class) { + + + System.out.println("Waiting for interrupt"); + try { + Converter.class.wait(); + System.out.println("INTERRUPTED"); + } catch (InterruptedException ie) { + System.out.println(ie.getMessage()); + } + } + } +} \ No newline at end of file Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesReceiver.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -10,9 +10,9 @@ package org.mmbase.clustering.multicast; import java.net.*; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.*; -import org.mmbase.module.core.MMBaseContext; +import org.mmbase.util.*; import org.mmbase.util.logging.Logger; import org.mmbase.util.logging.Logging; @@ -29,8 +29,7 @@ private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class); - /** Thread which sends the messages */ - private Thread kicker = null; + private Future future = null; /** Queue with messages received from other MMBase instances */ private final BlockingQueue<byte[]> nodesToSpawn; @@ -55,7 +54,7 @@ * @param nodesToSpawn Queue of received messages * @throws UnknownHostException when multicastHost is not found */ - ChangesReceiver(String multicastHost, int mport, int dpsize, BlockingQueue<byte[]> nodesToSpawn) throws UnknownHostException { + public ChangesReceiver(String multicastHost, int mport, int dpsize, BlockingQueue<byte[]> nodesToSpawn) throws UnknownHostException { this.mport = mport; this.dpsize = dpsize; this.nodesToSpawn = nodesToSpawn; @@ -63,8 +62,8 @@ this.start(); } - private void start() { - if (kicker == null && ia != null) { + void start() { + if (future == null && ia != null) { try { ms = new MulticastSocket(mport); ms.joinGroup(ia); @@ -72,7 +71,8 @@ log.error(e.getMessage(), e); } if (ms != null) { - kicker = MMBaseContext.startThread(this, "MulticastReceiver"); + future = ThreadPools.jobsExecutor.submit(this); + ThreadPools.identify(future, "MulticastReceiver"); log.debug("MulticastReceiver started"); } } @@ -87,15 +87,20 @@ } catch (Exception e) { // nothing } - if (kicker != null) { - kicker.interrupt(); - kicker = null; + if (future != null) { + try { + future.cancel(true); + future = null; + } catch (Throwable t) { + } } else { log.service("Cannot stop thread, because it is null"); } } + public void run() { + log.info("MultiCast receiving on " + ms + " " + ia + ":" + mport); // create a datapackage to receive all messages byte[] buffer = new byte[dpsize]; DatagramPacket dp = new DatagramPacket(buffer, dpsize); @@ -110,10 +115,10 @@ // That's not what we want. Especially when falling back to legacy, this is translated to a String. // which otherwise gets dpsize length (64k!) System.arraycopy(dp.getData(), 0, message, 0, dp.getLength()); + nodesToSpawn.offer(message); if (log.isDebugEnabled()) { - log.debug("RECEIVED=> " + dp.getLength() + " bytes from " + dp.getAddress()); + log.debug("Multicast RECEIVED=> " + dp.getLength() + " bytes from " + dp.getAddress() + " queue " + nodesToSpawn.size()); } - nodesToSpawn.offer(message); } catch (java.net.SocketException se) { // generally happens on shutdown (ms==null) // if not log it as an error Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/ChangesSender.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -13,9 +13,9 @@ import java.net.*; import java.io.*; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.*; -import org.mmbase.module.core.MMBaseContext; +import org.mmbase.util.ThreadPools; import org.mmbase.util.logging.Logger; import org.mmbase.util.logging.Logging; @@ -34,8 +34,7 @@ private final Statistics send; - /** Thread which sends the messages */ - private Thread kicker = null; + private Future future = null; /** Queue with messages to send to other MMBase instances */ private final BlockingQueue<byte[]> nodesToSend; @@ -60,7 +59,7 @@ * @param send Statistics object in which to administer duration costs * @throws UnknownHostException when multicastHost is not found */ - ChangesSender(String multicastHost, int mport, int mTTL, BlockingQueue<byte[]> nodesToSend, Statistics send) throws UnknownHostException { + public ChangesSender(String multicastHost, int mport, int mTTL, BlockingQueue<byte[]> nodesToSend, Statistics send) throws UnknownHostException { this.mport = mport; this.mTTL = mTTL; this.nodesToSend = nodesToSend; @@ -69,17 +68,18 @@ this.start(); } - private void start() { - if (kicker == null && ia != null) { + void start() { + if (future == null && ia != null) { try { ms = new MulticastSocket(); ms.joinGroup(ia); ms.setTimeToLive(mTTL); } catch(Exception e) { - log.error(Logging.stackTrace(e)); + log.error(e.getMessage(), e); } - kicker = MMBaseContext.startThread(this, "MulticastSender"); + future = ThreadPools.jobsExecutor.submit(this); + ThreadPools.identify(future, "MulticastSender"); log.debug("MulticastSender started"); } } @@ -92,17 +92,22 @@ // nothing } ms = null; - if (kicker != null) { - kicker.interrupt(); - kicker.setPriority(Thread.MIN_PRIORITY); - kicker = null; + if (future != null) { + try { + future.cancel(true); + } catch (Throwable t) { + } + future = null; } else { log.service("Cannot stop thread, because it is null"); } } + public MulticastSocket getSocket() { + return ms; + } public void run() { - log.debug("Started sending"); + log.info("MultiCast sending on " + ms + " " + ia + ":" + mport); while(ms != null) { try { byte[] data = nodesToSend.take(); Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/multicast/Multicast.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -33,17 +33,20 @@ public static final String CONFIG_FILE = "multicast.xml"; + public static final String HOST_DEFAULT = "ALL-SYSTEMS.MCAST.NET"; + public static final int PORT_DEFAULT = 16080; + /** * Defines what 'channel' we are talking to when using multicast. */ - private String multicastHost = "ALL-SYSTEMS.MCAST.NET"; + private String multicastHost = HOST_DEFAULT; /** * Determines on what port does this multicast talking between nodes take place. * This can be set to any port but check if something else on * your network is allready using multicast when you have problems. */ - private int multicastPort = 4243; + private int multicastPort = PORT_DEFAULT; /** Determines the Time To Live for a multicast datapacket */ private int multicastTTL = 1; @@ -123,11 +126,13 @@ } else { try { mcs = new ChangesSender(multicastHost, multicastPort, multicastTTL, nodesToSend, send); + mcs.start(); } catch (java.net.UnknownHostException e) { log.error(e); } try { mcr = new ChangesReceiver(multicastHost, multicastPort, dpsize, nodesToSpawn); + mcr.start(); } catch (java.net.UnknownHostException e) { log.error(e); } Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesReceiver.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -11,9 +11,10 @@ import java.io.*; import java.net.*; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.*; -import org.mmbase.core.util.DaemonThread; + +import org.mmbase.util.ThreadPools; import org.mmbase.util.logging.Logger; import org.mmbase.util.logging.Logging; @@ -29,14 +30,15 @@ private static final Logger log = Logging.getLoggerInstance(ChangesReceiver.class); - /** Thread which sends the messages */ - private Thread kicker = null; + private Future future = null; /** Queue with messages received from other MMBase instances */ private final BlockingQueue<byte[]> nodesToSpawn; private final ServerSocket serverSocket; + private final int version; + /** * Construct UniCast Receiver * @param unicastHost host of unicast connection @@ -44,29 +46,27 @@ * @param nodesToSpawn Queue of received messages * @throws IOException when server socket failrf to listen */ - ChangesReceiver(String unicastHost, int unicastPort, BlockingQueue<byte[]> nodesToSpawn) throws IOException { + public ChangesReceiver(String unicastHost, int unicastPort, BlockingQueue<byte[]> nodesToSpawn, int version) throws IOException { this.nodesToSpawn = nodesToSpawn; this.serverSocket = new ServerSocket(); SocketAddress address = new InetSocketAddress(unicastHost, unicastPort); serverSocket.bind(address); - log.info("Listening to " + address); + this.version = version; this.start(); } private void start() { - if (kicker == null) { - kicker = new DaemonThread(this, "UnicastReceiver"); - kicker.start(); - log.debug("UnicastReceiver started"); + if (future == null) { + future = ThreadPools.jobsExecutor.submit(this); + ThreadPools.identify(future, "UnicastReceiver"); } } void stop() { - if (kicker != null) { + if (future != null) { try { - kicker.interrupt(); - kicker.setPriority(Thread.MIN_PRIORITY); - kicker = null; + future.cancel(true); + future = null; } catch (Throwable t) { } try { @@ -80,35 +80,62 @@ } public void run() { + log.info("Unicast listening started on " + serverSocket + " (v:" + version + ")"); try { - while (kicker!=null) { + while (true) { + if (Thread.currentThread().isInterrupted()) break; Socket socket = null; - InputStream reader = null; + DataInputStream reader = null; try { socket = serverSocket.accept(); - reader = new BufferedInputStream(socket.getInputStream()); - ByteArrayOutputStream writer = new ByteArrayOutputStream(); - int size = 0; - //this buffer has nothing to do with the OS buffer - byte[] buffer = new byte[1024]; + log.debug("" + socket); - while ((size = reader.read(buffer)) != -1) { - if (writer != null) { - writer.write(buffer, 0, size); - writer.flush(); - } + reader = new DataInputStream(socket.getInputStream()); + + if (version > 1) { + int listSize = reader.readInt(); + log.debug("Will read " + listSize + " events"); + + for (int i = 0; i < listSize; i++) { + int arraySize = reader.readInt(); + log.debug("Size of event " + i + ": " + arraySize); + ByteArrayOutputStream writer = new ByteArrayOutputStream(); + //this buffer has nothing to do with the OS buffer + byte[] buffer = new byte[arraySize]; + reader.read(buffer); + if (writer != null) { + writer.write(buffer, 0, arraySize); + writer.flush(); + } + // maybe we should use encoding here? + byte[] message = writer.toByteArray(); + if (log.isDebugEnabled()) { + log.debug("unicase RECEIVED=>" + message); + } + nodesToSpawn.offer(message); + } + } else { + ByteArrayOutputStream writer = new ByteArrayOutputStream(); + int size = 0; + //this buffer has nothing to do with the OS buffer + byte[] buffer = new byte[1024]; + while ((size = reader.read(buffer)) != -1) { + if (writer != null) { + writer.write(buffer, 0, size); + writer.flush(); + } + } + byte[] message = writer.toByteArray(); + if (log.isDebugEnabled()) { + log.debug("RECEIVED=>" + message); + } + nodesToSpawn.offer(message); } - // maybe we should use encoding here? - byte[] message = writer.toByteArray(); - if (log.isDebugEnabled()) { - log.debug("RECEIVED=>" + message); - } - nodesToSpawn.offer(message); } catch (SocketException e) { log.warn(e); continue; } catch (Exception e) { - log.error(e); + log.error(e.getMessage(), e); } finally { if (reader != null) { try { Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/ChangesSender.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -14,9 +14,9 @@ import java.io.IOException; import java.net.*; import java.util.*; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.*; -import org.mmbase.core.util.DaemonThread; +import org.mmbase.util.ThreadPools; import org.mmbase.module.builders.MMServers; import org.mmbase.module.core.*; @@ -28,6 +28,7 @@ * sending queue over unicast connections * * @author Nico Klasens + * @author Michiel Meeuwissen * @version $Id$ */ public class ChangesSender implements Runnable { @@ -36,8 +37,7 @@ private final Statistics send; - /** Thread which sends the messages */ - private Thread kicker = null; + private Future future = null; /** Queue with messages to send to other MMBase instances */ private final BlockingQueue<byte[]> nodesToSend; @@ -56,6 +56,8 @@ /** Interval of servers change their state */ private long serverInterval; + private int version = 1; + /** * Construct UniCast Sender * @param configuration configuration of unicast @@ -64,87 +66,125 @@ * @param nodesToSend Queue of messages to send * @param send Statistics */ - ChangesSender(Map<String,String> configuration, int unicastPort, int unicastTimeout, BlockingQueue<byte[]> nodesToSend, Statistics send) { + public ChangesSender(Map<String,String> configuration, int unicastPort, int unicastTimeout, BlockingQueue<byte[]> nodesToSend, Statistics send, int version) { this.nodesToSend = nodesToSend; this.configuration = configuration; this.defaultUnicastPort = unicastPort; this.unicastTimeout = unicastTimeout; this.send = send; + this.version = version; this.start(); } - private void start() { - if (kicker == null) { - kicker = new DaemonThread(this, "UnicastSender"); - kicker.start(); - log.debug("UnicastSender started"); + void start() { + if (future == null) { + future = ThreadPools.jobsExecutor.submit(this); + ThreadPools.identify(future, "UnicastSender"); } } void stop() { - if (kicker != null) { - kicker.interrupt(); - kicker.setPriority(Thread.MIN_PRIORITY); - kicker = null; + if (future != null) { + try { + future.cancel(true); + future = null; + } catch (Throwable t) { + } } else { log.service("Cannot stop thread, because it is null"); } } + + public static class OtherMachine { + public final String host; + public final String machineName; + public final int unicastPort; + public final int version; + public OtherMachine(String host, String machineName, int unicastPort, int version) { + this.host = host; + this.machineName = machineName; + this.unicastPort = unicastPort; + this.version = version; + + } + @Override + public String toString() { + return host + ":" + unicastPort + " (v:" + version + ")"; + } + } + // javadoc inherited public void run() { - while(kicker != null) { + log.info("Unicast sending to " + getOtherMachines()); + while(true) { + if (Thread.currentThread().isInterrupted()) break; try { - byte[] data = nodesToSend.take(); + List<byte[]> data = new ArrayList<byte[]>(); + data.add(nodesToSend.take()); // at least one + if (version > 1) { + nodesToSend.drainTo(data); + } long startTime = System.currentTimeMillis(); - List<MMObjectNode> servers = getActiveServers(); - for (int i = 0; i < servers.size(); i++) { - MMObjectNode node = servers.get(i); - if (node != null) { - String hostname = node.getStringValue("host"); - String machinename = node.getStringValue("name"); - - int unicastPort = defaultUnicastPort; - String specificPort = configuration.get(machinename + ".unicastport"); - if (specificPort != null) { - unicastPort = Integer.parseInt(specificPort); - } - Socket socket = null; - DataOutputStream os = null; - try { + log.debug("Send change to " + getOtherMachines()); + for (OtherMachine machine : getOtherMachines()) { + DataOutputStream os = null; + Socket socket = null; + try { + if (machine.version > 1) { socket = new Socket(); - socket.connect(new InetSocketAddress(hostname, unicastPort), unicastTimeout); + socket.connect(new InetSocketAddress(machine.host, machine.unicastPort), unicastTimeout); os = new DataOutputStream(socket.getOutputStream()); - os.write(data, 0, data.length); + os.writeInt(data.size()); + send.bytes += 4; + for (byte[] d : data) { + os.writeInt(d.length); + send.bytes += 4; + os.write(d, 0, d.length); + send.bytes += d.length; + } os.flush(); - if (log.isDebugEnabled()) { - log.debug("SEND=>" + hostname + ":" + unicastPort); + } else { + for (byte[] d : data) { + socket = new Socket(); + socket.connect(new InetSocketAddress(machine.host, machine.unicastPort), unicastTimeout); + os = new DataOutputStream(socket.getOutputStream()); + os.write(d, 0, d.length); + send.bytes += d.length; + os.flush(); } - } catch(SocketTimeoutException ste) { - servers.remove(i); - log.warn("Server timeout: " + hostname + ":" + unicastPort + " " + ste + ". Removed " + node + " from active server list."); - } catch (ConnectException ce) { - log.warn("Connect exception: " + hostname + ":" + unicastPort + " " + ce + "."); - } catch (IOException e) { - log.error("can't send message to " + hostname + ":" + unicastPort + " " + e.getMessage() , e); - } finally { - if (os != null) { - try { - os.close(); - } catch (IOException e1) { - } + } + + if (log.isDebugEnabled()) { + log.debug("SEND=>" + machine + " (" + data.size() + " events)"); + } + } catch(SocketTimeoutException ste) { + int removed = remove(machine); + if (removed == 1) { + log.warn("Server timeout: " + machine + " " + ste + ". Removed from active server list."); + } else { + log.error("Server timeout: " + machine + " " + ste + ". Remove from active server list: " + removed); + } + } catch (ConnectException ce) { + log.warn("Connect exception: " + machine + " " + ce + "."); + } catch (IOException e) { + log.error("can't send message to " + machine + " " + e.getMessage() , e); + } finally { + if (os != null) { + try { + os.close(); + } catch (IOException e1) { } - if (socket != null) { - try { - socket.close(); - } catch (IOException e1) { - } + } + if (socket != null) { + try { + socket.close(); + } catch (IOException e1) { } } } } send.count++; - send.bytes += data.length; send.cost += (System.currentTimeMillis() - startTime); } catch (InterruptedException e) { @@ -185,4 +225,44 @@ return activeServers; } + protected int remove(OtherMachine remove) { + Iterator<MMObjectNode> i = activeServers.iterator(); + while (i.hasNext()) { + MMObjectNode node = i.next(); + String hostname = node.getStringValue("host"); + String machinename = node.getStringValue("name"); + if (remove.host.equals(hostname) && remove.machineName.equals(machinename)) { + i.remove(); + return 1; + } + } + return 0; + } + + + protected Iterable<OtherMachine> getOtherMachines() { + List<OtherMachine> result = new ArrayList<OtherMachine>(); + + for (MMObjectNode node : getActiveServers()) { + if (node != null) { + String hostname = node.getStringValue("host"); + String machinename = node.getStringValue("name"); + + int unicastPort = defaultUnicastPort; + int version = 1; + if (configuration != null) { + String specificPort = configuration.get(machinename + ".unicastport"); + if (specificPort != null) { + unicastPort = Integer.parseInt(specificPort); + } + String specificVersion = configuration.get(machinename + ".version"); + if (specificVersion != null) { + version = Integer.parseInt(specificVersion); + } + } + result.add(new OtherMachine(hostname, machinename, unicastPort, version)); + } + } + return result; + } } Modified: mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java =================================================================== --- mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java 2010-07-09 13:29:04 UTC (rev 42843) +++ mmbase/trunk/applications/clustering/src/main/java/org/mmbase/clustering/unicast/Unicast.java 2010-07-09 13:31:00 UTC (rev 42844) @@ -41,7 +41,9 @@ /** Timeout of the connection.*/ private int unicastTimeout = 10 * 1000; + private int version = 1; + /** Sender which reads the nodesToSend Queue amd puts the message on the line */ private ChangesSender ucs; /** Receiver which reads the message from the line and puts message in the nodesToSpawn Queue */ @@ -63,7 +65,7 @@ - public Unicast(){ + public Unicast() { readConfiguration(reader.getProperties()); start(); } @@ -102,9 +104,26 @@ } catch (Exception e) {} } + { + + String tmpVersion = configuration.get("version"); + if (tmpVersion != null && !tmpVersion.equals("")) { + try { + version = Integer.parseInt(tmpVersion); + } catch (Exception e) {} + } + tmpVersion = configuration.get(org.mmbase.module.core.MMBase.getMMBase().getMachineName() + ".version"); + if (tmpVersion != null && !tmpVersion.equals("")) { + try { + version = Integer.parseInt(tmpVersion); + } catch (Exception e) {} + } + } + log.info("unicast host: " + unicastHost); log.info("unicast port: " + unicastPort); log.info("unicast timeout: " + unicastTimeout); + log.info("unicast version: " + version + " (" + (version > 1 ? "multiple messages" : "single message") + ")"); } @@ -115,9 +134,9 @@ if (unicastPort == -1) { log.service("Not starting unicast threads because port number configured to be -1"); } else { - ucs = new ChangesSender(reader.getProperties(), unicastPort, unicastTimeout, nodesToSend, send); + ucs = new ChangesSender(reader.getProperties(), unicastPort, unicastTimeout, nodesToSend, send, version); try { - ucr = new ChangesReceiver(unicastHost, unicastPort, nodesToSpawn); + ucr = new ChangesReceiver(unicastHost, unicastPort, nodesToSpawn, version); } catch (java.io.IOException ioe) { log.error(ioe); } _______________________________________________ Cvs mailing list Cvs@lists.mmbase.org http://lists.mmbase.org/mailman/listinfo/cvs