This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 10.1.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/10.1.x by this push: new ea1d6fe231 Code clean-up - no functional change ea1d6fe231 is described below commit ea1d6fe2317332c3200bc8aa895bd8cc0a9c82a4 Author: Mark Thomas <ma...@apache.org> AuthorDate: Fri May 10 14:48:01 2024 +0100 Code clean-up - no functional change --- .../catalina/tribes/membership/Constants.java | 4 +- .../catalina/tribes/membership/McastService.java | 206 +++++++------ .../tribes/membership/McastServiceImpl.java | 246 ++++++++-------- .../catalina/tribes/membership/MemberImpl.java | 322 ++++++++++----------- .../catalina/tribes/membership/Membership.java | 52 ++-- .../tribes/membership/MembershipProviderBase.java | 2 +- .../tribes/membership/MembershipServiceBase.java | 14 +- .../catalina/tribes/membership/StaticMember.java | 24 +- .../membership/StaticMembershipProvider.java | 56 ++-- .../tribes/membership/StaticMembershipService.java | 28 +- .../membership/cloud/AbstractStreamProvider.java | 54 ++-- .../cloud/CertificateStreamProvider.java | 15 +- .../membership/cloud/CloudMembershipProvider.java | 17 +- .../membership/cloud/CloudMembershipService.java | 36 ++- .../membership/cloud/DNSMembershipProvider.java | 9 +- .../cloud/KubernetesMembershipProvider.java | 30 +- .../tribes/membership/cloud/StreamProvider.java | 12 +- .../membership/cloud/TokenStreamProvider.java | 2 +- 18 files changed, 574 insertions(+), 555 deletions(-) diff --git a/java/org/apache/catalina/tribes/membership/Constants.java b/java/org/apache/catalina/tribes/membership/Constants.java index b0a9dda156..dd5a2f5866 100644 --- a/java/org/apache/catalina/tribes/membership/Constants.java +++ b/java/org/apache/catalina/tribes/membership/Constants.java @@ -19,14 +19,14 @@ package org.apache.catalina.tribes.membership; import org.apache.catalina.tribes.util.Arrays; /** - * Manifest constants for the <code>org.apache.catalina.tribes.membership</code> - * package. + * Manifest constants for the <code>org.apache.catalina.tribes.membership</code> package. * * @author Peter Rossbach */ public class Constants { public static final String Package = "org.apache.catalina.tribes.membership"; + public static void main(String[] args) throws Exception { System.out.println(Arrays.toString("TRIBES-B".getBytes())); System.out.println(Arrays.toString("TRIBES-E".getBytes())); diff --git a/java/org/apache/catalina/tribes/membership/McastService.java b/java/org/apache/catalina/tribes/membership/McastService.java index 4c2c11917c..4768455281 100644 --- a/java/org/apache/catalina/tribes/membership/McastService.java +++ b/java/org/apache/catalina/tribes/membership/McastService.java @@ -38,13 +38,11 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** - * A <b>membership</b> implementation using simple multicast. - * This is the representation of a multicast membership service. - * This class is responsible for maintaining a list of active cluster nodes in the cluster. - * If a node fails to send out a heartbeat, the node will be dismissed. + * A <b>membership</b> implementation using simple multicast. This is the representation of a multicast membership + * service. This class is responsible for maintaining a list of active cluster nodes in the cluster. If a node fails to + * send out a heartbeat, the node will be dismissed. */ -public class McastService - extends MembershipServiceBase implements MessageListener, McastServiceMBean { +public class McastService extends MembershipServiceBase implements MessageListener, McastServiceMBean { private static final Log log = LogFactory.getLog(McastService.class); @@ -65,7 +63,7 @@ public class McastService /** * The local member */ - protected MemberImpl localMember ; + protected MemberImpl localMember; private int mcastSoTimeout; private int mcastTTL; @@ -82,31 +80,33 @@ public class McastService * Create a membership service. */ public McastService() { - //default values + // default values setDefaults(this.properties); } /** * Sets the properties for the membership service. - * @param properties - * <br>All are required<br> - * 1. mcastPort - the port to listen to<BR> - * 2. mcastAddress - the mcast group address<BR> - * 4. bindAddress - the bind address if any - only one that can be null<BR> - * 5. memberDropTime - the time a member is gone before it is considered gone.<BR> - * 6. mcastFrequency - the frequency of sending messages<BR> - * 7. tcpListenPort - the port this member listens to<BR> - * 8. tcpListenHost - the bind address of this member<BR> + * + * @param properties <br> + * All are required<br> + * 1. mcastPort - the port to listen to<BR> + * 2. mcastAddress - the mcast group address<BR> + * 4. bindAddress - the bind address if any - only one that can be null<BR> + * 5. memberDropTime - the time a member is gone before it is considered gone.<BR> + * 6. mcastFrequency - the frequency of sending messages<BR> + * 7. tcpListenPort - the port this member listens to<BR> + * 8. tcpListenHost - the bind address of this member<BR> + * * @exception java.lang.IllegalArgumentException if a property is missing. */ @Override public void setProperties(Properties properties) { - hasProperty(properties,"mcastPort"); - hasProperty(properties,"mcastAddress"); - hasProperty(properties,"memberDropTime"); - hasProperty(properties,"mcastFrequency"); - hasProperty(properties,"tcpListenPort"); - hasProperty(properties,"tcpListenHost"); + hasProperty(properties, "mcastPort"); + hasProperty(properties, "mcastAddress"); + hasProperty(properties, "memberDropTime"); + hasProperty(properties, "mcastFrequency"); + hasProperty(properties, "tcpListenPort"); + hasProperty(properties, "tcpListenHost"); setDefaults(properties); this.properties = properties; } @@ -116,23 +116,23 @@ public class McastService */ @Override public String getLocalMemberName() { - return localMember.toString() ; + return localMember.toString(); } @Override public Member getLocalMember(boolean alive) { - if ( alive && localMember != null && impl != null) { - localMember.setMemberAliveTime(System.currentTimeMillis()-impl.getServiceStartTime()); + if (alive && localMember != null && impl != null) { + localMember.setMemberAliveTime(System.currentTimeMillis() - impl.getServiceStartTime()); } return localMember; } @Override public void setLocalMemberProperties(String listenHost, int listenPort, int securePort, int udpPort) { - properties.setProperty("tcpListenHost",listenHost); - properties.setProperty("tcpListenPort",String.valueOf(listenPort)); - properties.setProperty("udpListenPort",String.valueOf(udpPort)); - properties.setProperty("tcpSecurePort",String.valueOf(securePort)); + properties.setProperty("tcpListenHost", listenHost); + properties.setProperty("tcpListenPort", String.valueOf(listenPort)); + properties.setProperty("udpListenPort", String.valueOf(udpPort)); + properties.setProperty("tcpSecurePort", String.valueOf(securePort)); try { if (localMember != null) { localMember.setHostname(listenHost); @@ -147,7 +147,7 @@ public class McastService localMember.setSecurePort(securePort); localMember.setUdpPort(udpPort); localMember.getData(true, true); - }catch ( IOException x ) { + } catch (IOException x) { throw new IllegalArgumentException(x); } } @@ -183,9 +183,9 @@ public class McastService } @Override - public int getRecoveryCounter(){ + public int getRecoveryCounter() { String p = properties.getProperty("recoveryCounter"); - if(p != null){ + if (p != null) { return Integer.parseInt(p); } return -1; @@ -198,7 +198,7 @@ public class McastService @Override public boolean getRecoveryEnabled() { String p = properties.getProperty("recoveryEnabled"); - if(p != null){ + if (p != null) { return Boolean.parseBoolean(p); } return false; @@ -209,22 +209,22 @@ public class McastService } @Override - public long getRecoverySleepTime(){ + public long getRecoverySleepTime() { String p = properties.getProperty("recoverySleepTime"); - if(p != null){ + if (p != null) { return Long.parseLong(p); } return -1; } public void setLocalLoopbackDisabled(boolean localLoopbackDisabled) { - properties.setProperty("localLoopbackDisabled",String.valueOf(localLoopbackDisabled)); + properties.setProperty("localLoopbackDisabled", String.valueOf(localLoopbackDisabled)); } @Override public boolean getLocalLoopbackDisabled() { String p = properties.getProperty("localLoopbackDisabled"); - if(p != null){ + if (p != null) { return Boolean.parseBoolean(p); } return false; @@ -249,6 +249,7 @@ public class McastService public void setMcastDropTime(long time) { setDropTime(time); } + public void setDropTime(long time) { properties.setProperty("memberDropTime", String.valueOf(time)); } @@ -261,28 +262,29 @@ public class McastService /** * Check if a required property is available. + * * @param properties The set of properties - * @param name The property to check for + * @param name The property to check for */ - protected void hasProperty(Properties properties, String name){ - if ( properties.getProperty(name)==null) { + protected void hasProperty(Properties properties, String name) { + if (properties.getProperty(name) == null) { throw new IllegalArgumentException(sm.getString("mcastService.missing.property", name)); } } @Override public void start(int level) throws Exception { - hasProperty(properties,"mcastPort"); - hasProperty(properties,"mcastAddress"); - hasProperty(properties,"memberDropTime"); - hasProperty(properties,"mcastFrequency"); - hasProperty(properties,"tcpListenPort"); - hasProperty(properties,"tcpListenHost"); - hasProperty(properties,"tcpSecurePort"); - hasProperty(properties,"udpListenPort"); + hasProperty(properties, "mcastPort"); + hasProperty(properties, "mcastAddress"); + hasProperty(properties, "memberDropTime"); + hasProperty(properties, "mcastFrequency"); + hasProperty(properties, "tcpListenPort"); + hasProperty(properties, "tcpListenHost"); + hasProperty(properties, "tcpSecurePort"); + hasProperty(properties, "udpListenPort"); - if ( impl != null ) { + if (impl != null) { impl.start(level); return; } @@ -291,7 +293,7 @@ public class McastService int securePort = Integer.parseInt(getProperties().getProperty("tcpSecurePort")); int udpPort = Integer.parseInt(getProperties().getProperty("udpListenPort")); - if ( localMember == null ) { + if (localMember == null) { localMember = new MemberImpl(host, port, 100); localMember.setUniqueId(UUIDGenerator.randomUUID(true)); localMember.setLocal(true); @@ -302,46 +304,39 @@ public class McastService } localMember.setSecurePort(securePort); localMember.setUdpPort(udpPort); - if ( this.payload != null ) { + if (this.payload != null) { localMember.setPayload(payload); } - if ( this.domain != null ) { + if (this.domain != null) { localMember.setDomain(domain); } localMember.setServiceStartTime(System.currentTimeMillis()); java.net.InetAddress bind = null; - if ( properties.getProperty("mcastBindAddress")!= null ) { + if (properties.getProperty("mcastBindAddress") != null) { bind = java.net.InetAddress.getByName(properties.getProperty("mcastBindAddress")); } int ttl = -1; int soTimeout = -1; - if ( properties.getProperty("mcastTTL") != null ) { + if (properties.getProperty("mcastTTL") != null) { try { ttl = Integer.parseInt(properties.getProperty("mcastTTL")); - } catch ( Exception x ) { - log.error(sm.getString("McastService.parseTTL", - properties.getProperty("mcastTTL")), x); + } catch (Exception x) { + log.error(sm.getString("McastService.parseTTL", properties.getProperty("mcastTTL")), x); } } - if ( properties.getProperty("mcastSoTimeout") != null ) { + if (properties.getProperty("mcastSoTimeout") != null) { try { soTimeout = Integer.parseInt(properties.getProperty("mcastSoTimeout")); - } catch ( Exception x ) { - log.error(sm.getString("McastService.parseSoTimeout", - properties.getProperty("mcastSoTimeout")), x); + } catch (Exception x) { + log.error(sm.getString("McastService.parseSoTimeout", properties.getProperty("mcastSoTimeout")), x); } } - impl = new McastServiceImpl(localMember,Long.parseLong(properties.getProperty("mcastFrequency")), - Long.parseLong(properties.getProperty("memberDropTime")), - Integer.parseInt(properties.getProperty("mcastPort")), - bind, - java.net.InetAddress.getByName(properties.getProperty("mcastAddress")), - ttl, - soTimeout, - this, - this, - Boolean.parseBoolean(properties.getProperty("localLoopbackDisabled"))); + impl = new McastServiceImpl(localMember, Long.parseLong(properties.getProperty("mcastFrequency")), + Long.parseLong(properties.getProperty("memberDropTime")), + Integer.parseInt(properties.getProperty("mcastPort")), bind, + java.net.InetAddress.getByName(properties.getProperty("mcastAddress")), ttl, soTimeout, this, this, + Boolean.parseBoolean(properties.getProperty("localLoopbackDisabled"))); impl.setMembershipService(this); String value = properties.getProperty("recoveryEnabled"); boolean recEnabled = Boolean.parseBoolean(value); @@ -367,8 +362,8 @@ public class McastService */ @Override public void stop(int svc) { - try { - if ( impl != null && impl.stop(svc) ) { + try { + if (impl != null && impl.stop(svc)) { if (oname != null) { JmxRegistry.getRegistry(channel).unregisterJmx(oname); oname = null; @@ -377,9 +372,8 @@ public class McastService impl = null; channel = null; } - } catch ( Exception x) { - log.error(sm.getString( - "McastService.stopFail", Integer.valueOf(svc)), x); + } catch (Exception x) { + log.error(sm.getString("McastService.stopFail", Integer.valueOf(svc)), x); } } @@ -393,7 +387,7 @@ public class McastService @Override public void messageReceived(ChannelMessage msg) { - if (msglistener!=null && msglistener.accept(msg)) { + if (msglistener != null && msglistener.accept(msg)) { msglistener.messageReceived(msg); } } @@ -402,19 +396,19 @@ public class McastService public boolean accept(ChannelMessage msg) { return true; } + @Override public void broadcast(ChannelMessage message) throws ChannelException { - if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) { + if (impl == null || (impl.startLevel & Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ) { throw new ChannelException(sm.getString("mcastService.noStart")); } - byte[] data = XByteBuffer.createDataPackage((ChannelData)message); - if (data.length>McastServiceImpl.MAX_PACKET_SIZE) { - throw new ChannelException(sm.getString("mcastService.exceed.maxPacketSize", - Integer.toString(data.length) , + byte[] data = XByteBuffer.createDataPackage((ChannelData) message); + if (data.length > McastServiceImpl.MAX_PACKET_SIZE) { + throw new ChannelException(sm.getString("mcastService.exceed.maxPacketSize", Integer.toString(data.length), Integer.toString(McastServiceImpl.MAX_PACKET_SIZE))); } - DatagramPacket packet = new DatagramPacket(data,0,data.length); + DatagramPacket packet = new DatagramPacket(data, 0, data.length); try { impl.send(false, packet); } catch (Exception x) { @@ -454,13 +448,13 @@ public class McastService @Override public void setPayload(byte[] payload) { this.payload = payload; - if ( localMember != null ) { + if (localMember != null) { localMember.setPayload(payload); try { if (impl != null) { impl.send(false); } - }catch ( Exception x ) { + } catch (Exception x) { log.error(sm.getString("McastService.payload"), x); } } @@ -469,23 +463,23 @@ public class McastService @Override public void setDomain(byte[] domain) { this.domain = domain; - if ( localMember != null ) { + if (localMember != null) { localMember.setDomain(domain); try { if (impl != null) { impl.send(false); } - }catch ( Exception x ) { + } catch (Exception x) { log.error(sm.getString("McastService.domain"), x); } } } public void setDomain(String domain) { - if ( domain == null ) { + if (domain == null) { return; } - if ( domain.startsWith("{") ) { + if (domain.startsWith("{")) { setDomain(Arrays.fromString(domain)); } else { setDomain(Arrays.convert(domain)); @@ -500,16 +494,16 @@ public class McastService protected void setDefaults(Properties properties) { // default values if (properties.getProperty("mcastPort") == null) { - properties.setProperty("mcastPort","45564"); + properties.setProperty("mcastPort", "45564"); } if (properties.getProperty("mcastAddress") == null) { - properties.setProperty("mcastAddress","228.0.0.4"); + properties.setProperty("mcastAddress", "228.0.0.4"); } if (properties.getProperty("memberDropTime") == null) { - properties.setProperty("memberDropTime","3000"); + properties.setProperty("memberDropTime", "3000"); } if (properties.getProperty("mcastFrequency") == null) { - properties.setProperty("mcastFrequency","500"); + properties.setProperty("mcastFrequency", "500"); } if (properties.getProperty("recoveryCounter") == null) { properties.setProperty("recoveryCounter", "10"); @@ -527,24 +521,26 @@ public class McastService /** * Simple test program + * * @param args Command-line arguments + * * @throws Exception If an error occurs */ public static void main(String args[]) throws Exception { McastService service = new McastService(); Properties p = new Properties(); - p.setProperty("mcastPort","5555"); - p.setProperty("mcastAddress","224.10.10.10"); - p.setProperty("mcastClusterDomain","catalina"); - p.setProperty("bindAddress","localhost"); - p.setProperty("memberDropTime","3000"); - p.setProperty("mcastFrequency","500"); - p.setProperty("tcpListenPort","4000"); - p.setProperty("tcpListenHost","127.0.0.1"); - p.setProperty("tcpSecurePort","4100"); - p.setProperty("udpListenPort","4200"); + p.setProperty("mcastPort", "5555"); + p.setProperty("mcastAddress", "224.10.10.10"); + p.setProperty("mcastClusterDomain", "catalina"); + p.setProperty("bindAddress", "localhost"); + p.setProperty("memberDropTime", "3000"); + p.setProperty("mcastFrequency", "500"); + p.setProperty("tcpListenPort", "4000"); + p.setProperty("tcpListenHost", "127.0.0.1"); + p.setProperty("tcpSecurePort", "4100"); + p.setProperty("udpListenPort", "4200"); service.setProperties(p); service.start(); - Thread.sleep(60*1000*60); + Thread.sleep(60 * 1000 * 60); } } diff --git a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java index c15da0ec21..108abb7cc7 100644 --- a/java/org/apache/catalina/tribes/membership/McastServiceImpl.java +++ b/java/org/apache/catalina/tribes/membership/McastServiceImpl.java @@ -40,13 +40,11 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** - * A <b>membership</b> implementation using simple multicast. - * This is the representation of a multicast membership service. - * This class is responsible for maintaining a list of active cluster nodes in the cluster. - * If a node fails to send out a heartbeat, the node will be dismissed. - * This is the low level implementation that handles the multicasting sockets. - * Need to fix this, could use java.nio and only need one thread to send and receive, or - * just use a timeout on the receive + * A <b>membership</b> implementation using simple multicast. This is the representation of a multicast membership + * service. This class is responsible for maintaining a list of active cluster nodes in the cluster. If a node fails to + * send out a heartbeat, the node will be dismissed. This is the low level implementation that handles the multicasting + * sockets. Need to fix this, could use java.nio and only need one thread to send and receive, or just use a timeout on + * the receive */ public class McastServiceImpl extends MembershipProviderBase { @@ -148,32 +146,24 @@ public class McastServiceImpl extends MembershipProviderBase { /** * Create a new mcast service instance. - * @param member - the local member - * @param sendFrequency - the time (ms) in between pings sent out - * @param expireTime - the time (ms) for a member to expire - * @param port - the mcast port - * @param bind - the bind address (not sure this is used yet) - * @param mcastAddress - the mcast address - * @param ttl multicast ttl that will be set on the socket - * @param soTimeout Socket timeout - * @param service - the callback service - * @param msgservice Message listener + * + * @param member - the local member + * @param sendFrequency - the time (ms) in between pings sent out + * @param expireTime - the time (ms) for a member to expire + * @param port - the mcast port + * @param bind - the bind address (not sure this is used yet) + * @param mcastAddress - the mcast address + * @param ttl multicast ttl that will be set on the socket + * @param soTimeout Socket timeout + * @param service - the callback service + * @param msgservice Message listener * @param localLoopbackDisabled - disable loopbackMode + * * @throws IOException Init error */ - public McastServiceImpl( - MemberImpl member, - long sendFrequency, - long expireTime, - int port, - InetAddress bind, - InetAddress mcastAddress, - int ttl, - int soTimeout, - MembershipListener service, - MessageListener msgservice, - boolean localLoopbackDisabled) - throws IOException { + public McastServiceImpl(MemberImpl member, long sendFrequency, long expireTime, int port, InetAddress bind, + InetAddress mcastAddress, int ttl, int soTimeout, MembershipListener service, MessageListener msgservice, + boolean localLoopbackDisabled) throws IOException { this.member = member; this.address = mcastAddress; this.port = port; @@ -190,14 +180,14 @@ public class McastServiceImpl extends MembershipProviderBase { public void init() throws IOException { setupSocket(); - sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE); + sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE); sendPacket.setAddress(address); sendPacket.setPort(port); - receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE],MAX_PACKET_SIZE); + receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE); receivePacket.setAddress(address); receivePacket.setPort(port); member.setCommand(new byte[0]); - if ( membership == null ) { + if (membership == null) { membership = new Membership(member); } } @@ -206,12 +196,11 @@ public class McastServiceImpl extends MembershipProviderBase { if (mcastBindAddress != null) { try { log.info(sm.getString("mcastServiceImpl.bind", address, Integer.toString(port))); - socket = new MulticastSocket(new InetSocketAddress(address,port)); + socket = new MulticastSocket(new InetSocketAddress(address, port)); } catch (BindException e) { /* - * On some platforms (e.g. Linux) it is not possible to bind - * to the multicast address. In this case only bind to the - * port. + * On some platforms (e.g. Linux) it is not possible to bind to the multicast address. In this case only + * bind to the port. */ log.info(sm.getString("mcastServiceImpl.bind.failed")); socket = new MulticastSocket(port); @@ -222,24 +211,23 @@ public class McastServiceImpl extends MembershipProviderBase { // Hint if we want disable loop back(local machine) messages JreCompat.getInstance().setSocketoptionIpMulticastLoop(socket, !localLoopbackDisabled); if (mcastBindAddress != null) { - if(log.isInfoEnabled()) { + if (log.isInfoEnabled()) { log.info(sm.getString("mcastServiceImpl.setInterface", mcastBindAddress)); } NetworkInterface networkInterface = NetworkInterface.getByInetAddress(mcastBindAddress); socket.setNetworkInterface(networkInterface); - } //end if - //force a so timeout so that we don't block forever + } // end if + // force a so timeout so that we don't block forever if (mcastSoTimeout <= 0) { - mcastSoTimeout = (int)sendFrequency; + mcastSoTimeout = (int) sendFrequency; } if (log.isInfoEnabled()) { - log.info(sm.getString("mcastServiceImpl.setSoTimeout", - Integer.toString(mcastSoTimeout))); + log.info(sm.getString("mcastServiceImpl.setSoTimeout", Integer.toString(mcastSoTimeout))); } socket.setSoTimeout(mcastSoTimeout); - if ( mcastTTL >= 0 ) { - if(log.isInfoEnabled()) { + if (mcastTTL >= 0) { + if (log.isInfoEnabled()) { log.info(sm.getString("mcastServiceImpl.setTTL", Integer.toString(mcastTTL))); } socket.setTimeToLive(mcastTTL); @@ -250,15 +238,15 @@ public class McastServiceImpl extends MembershipProviderBase { @Override public synchronized void start(int level) throws IOException { boolean valid = false; - if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) { - if ( receiver != null ) { + if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) { + if (receiver != null) { throw new IllegalStateException(sm.getString("mcastServiceImpl.receive.running")); } try { - if ( sender == null ) { + if (sender == null) { socket.joinGroup(new InetSocketAddress(address, 0), null); } - }catch (IOException iox) { + } catch (IOException iox) { log.error(sm.getString("mcastServiceImpl.unable.join")); throw iox; } @@ -268,38 +256,41 @@ public class McastServiceImpl extends MembershipProviderBase { receiver.start(); valid = true; } - if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) { - if ( sender != null ) { + if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) { + if (sender != null) { throw new IllegalStateException(sm.getString("mcastServiceImpl.send.running")); } - if ( receiver == null ) { + if (receiver == null) { socket.joinGroup(new InetSocketAddress(address, 0), null); } - //make sure at least one packet gets out there + // make sure at least one packet gets out there send(false); doRunSender = true; sender = new SenderThread(sendFrequency); sender.setDaemon(true); sender.start(); - //we have started the receiver, but not yet waited for membership to establish + // we have started the receiver, but not yet waited for membership to establish valid = true; } if (!valid) { throw new IllegalArgumentException(sm.getString("mcastServiceImpl.invalid.startLevel")); } - //pause, once or twice + // pause, once or twice waitForMembers(level); startLevel = (startLevel | level); } private void waitForMembers(int level) { - long memberwait = sendFrequency*2; - if(log.isInfoEnabled()) { - log.info(sm.getString("mcastServiceImpl.waitForMembers.start", - Long.toString(memberwait), Integer.toString(level))); + long memberwait = sendFrequency * 2; + if (log.isInfoEnabled()) { + log.info(sm.getString("mcastServiceImpl.waitForMembers.start", Long.toString(memberwait), + Integer.toString(level))); + } + try { + Thread.sleep(memberwait); + } catch (InterruptedException ignore) { } - try {Thread.sleep(memberwait);}catch (InterruptedException ignore){} - if(log.isInfoEnabled()) { + if (log.isInfoEnabled()) { log.info(sm.getString("mcastServiceImpl.waitForMembers.done", Integer.toString(level))); } } @@ -308,18 +299,18 @@ public class McastServiceImpl extends MembershipProviderBase { public synchronized boolean stop(int level) throws IOException { boolean valid = false; - if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) { + if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) { valid = true; doRunReceiver = false; - if ( receiver !=null ) { + if (receiver != null) { receiver.interrupt(); } receiver = null; } - if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) { + if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) { valid = true; doRunSender = false; - if ( sender != null ) { + if (sender != null) { sender.interrupt(); } sender = null; @@ -329,17 +320,17 @@ public class McastServiceImpl extends MembershipProviderBase { throw new IllegalArgumentException(sm.getString("mcastServiceImpl.invalid.stopLevel")); } startLevel = (startLevel & (~level)); - //we're shutting down, send a shutdown message and close the socket - if ( startLevel == 0 ) { - //send a stop message + // we're shutting down, send a shutdown message and close the socket + if (startLevel == 0) { + // send a stop message member.setCommand(Member.SHUTDOWN_PAYLOAD); send(false); - //leave mcast group + // leave mcast group try { socket.leaveGroup(new InetSocketAddress(address, 0), null); - } catch ( Exception ignore) { - // NO-OP - } + } catch (Exception ignore) { + // NO-OP + } try { socket.close(); } catch (Exception ignore) { @@ -352,6 +343,7 @@ public class McastServiceImpl extends MembershipProviderBase { /** * Receive a datagram packet, locking wait + * * @throws IOException Received failed */ public void receive() throws IOException { @@ -359,23 +351,22 @@ public class McastServiceImpl extends MembershipProviderBase { try { socket.receive(receivePacket); - if(receivePacket.getLength() > MAX_PACKET_SIZE) { - log.error(sm.getString("mcastServiceImpl.packet.tooLong", - Integer.toString(receivePacket.getLength()))); + if (receivePacket.getLength() > MAX_PACKET_SIZE) { + log.error(sm.getString("mcastServiceImpl.packet.tooLong", Integer.toString(receivePacket.getLength()))); } else { byte[] data = new byte[receivePacket.getLength()]; System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length); - if (XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) { + if (XByteBuffer.firstIndexOf(data, 0, MemberImpl.TRIBES_MBR_BEGIN) == 0) { memberDataReceived(data); } else { memberBroadcastsReceived(data); } } - } catch (SocketTimeoutException x ) { - //do nothing, this is normal, we don't want to block forever - //since the receive thread is the same thread - //that does membership expiration + } catch (SocketTimeoutException x) { + // do nothing, this is normal, we don't want to block forever + // since the receive thread is the same thread + // that does membership expiration } if (checkexpired) { checkExpired(); @@ -426,11 +417,11 @@ public class McastServiceImpl extends MembershipProviderBase { if (log.isTraceEnabled()) { log.trace("Mcast received broadcasts."); } - XByteBuffer buffer = new XByteBuffer(b,true); - if (buffer.countPackages(true)>0) { + XByteBuffer buffer = new XByteBuffer(b, true); + if (buffer.countPackages(true) > 0) { int count = buffer.countPackages(); final ChannelData[] data = new ChannelData[count]; - for (int i=0; i<count; i++) { + for (int i = 0; i < count; i++) { try { data[i] = buffer.extractPackage(true); } catch (IllegalStateException ise) { @@ -466,6 +457,7 @@ public class McastServiceImpl extends MembershipProviderBase { } protected final Object expiredMutex = new Object(); + protected void checkExpired() { synchronized (expiredMutex) { Member[] expired = membership.expire(timeToExpiration); @@ -494,42 +486,44 @@ public class McastServiceImpl extends MembershipProviderBase { /** * Send a ping. + * * @param checkexpired <code>true</code> to check for expiration + * * @throws IOException Send error */ public void send(boolean checkexpired) throws IOException { - send(checkexpired,null); + send(checkexpired, null); } private final Object sendLock = new Object(); public void send(boolean checkexpired, DatagramPacket packet) throws IOException { - checkexpired = (checkexpired && (packet==null)); - //ignore if we haven't started the sender - //if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return; - if (packet==null) { + checkexpired = (checkexpired && (packet == null)); + // ignore if we haven't started the sender + // if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return; + if (packet == null) { member.inc(); - if(log.isTraceEnabled()) { + if (log.isTraceEnabled()) { log.trace("Mcast send ping from member " + member); } byte[] data = member.getData(); - packet = new DatagramPacket(data,data.length); + packet = new DatagramPacket(data, data.length); } else if (log.isTraceEnabled()) { - log.trace("Sending message broadcast "+packet.getLength()+ " bytes from "+ member); + log.trace("Sending message broadcast " + packet.getLength() + " bytes from " + member); } packet.setAddress(address); packet.setPort(port); - //TODO this operation is not thread safe + // TODO this operation is not thread safe synchronized (sendLock) { socket.send(packet); } - if ( checkexpired ) { + if (checkexpired) { checkExpired(); } } public long getServiceStartTime() { - return (member!=null) ? member.getServiceStartTime() : -1l; + return (member != null) ? member.getServiceStartTime() : -1l; } public int getRecoveryCounter() { @@ -554,6 +548,7 @@ public class McastServiceImpl extends MembershipProviderBase { public class ReceiverThread extends Thread { int errorCounter = 0; + public ReceiverThread() { super(); String channelName = ""; @@ -562,21 +557,22 @@ public class McastServiceImpl extends MembershipProviderBase { } setName("Tribes-MembershipReceiver" + channelName); } + @Override public void run() { - while ( doRunReceiver ) { + while (doRunReceiver) { try { receive(); - errorCounter=0; - } catch ( ArrayIndexOutOfBoundsException ax ) { - //we can ignore this, as it means we have an invalid package - //but we will log it to debug - if ( log.isDebugEnabled() ) { + errorCounter = 0; + } catch (ArrayIndexOutOfBoundsException ax) { + // we can ignore this, as it means we have an invalid package + // but we will log it to debug + if (log.isDebugEnabled()) { log.debug(sm.getString("mcastServiceImpl.invalidMemberPackage"), ax); } - } catch ( Exception x ) { - if (errorCounter==0 && doRunReceiver) { - log.warn(sm.getString("mcastServiceImpl.error.receiving"),x); + } catch (Exception x) { + if (errorCounter == 0 && doRunReceiver) { + log.warn(sm.getString("mcastServiceImpl.error.receiving"), x); } else if (log.isDebugEnabled()) { if (doRunReceiver) { log.debug(sm.getString("mcastServiceImpl.error.receiving"), x); @@ -587,22 +583,23 @@ public class McastServiceImpl extends MembershipProviderBase { if (doRunReceiver) { try { sleep(500); - } catch (Exception ignore){ + } catch (Exception ignore) { // Ignore } - if ( (++errorCounter)>=recoveryCounter ) { - errorCounter=0; + if ((++errorCounter) >= recoveryCounter) { + errorCounter = 0; RecoveryThread.recover(McastServiceImpl.this); } } } } } - }//class ReceiverThread + }// class ReceiverThread public class SenderThread extends Thread { final long time; - int errorCounter=0; + int errorCounter = 0; + public SenderThread(long time) { this.time = time; String channelName = ""; @@ -612,20 +609,21 @@ public class McastServiceImpl extends MembershipProviderBase { setName("Tribes-MembershipSender" + channelName); } + @Override public void run() { - while ( doRunSender ) { + while (doRunSender) { try { send(true); errorCounter = 0; - } catch ( Exception x ) { - if (errorCounter==0) { - log.warn(sm.getString("mcastServiceImpl.send.failed"),x); + } catch (Exception x) { + if (errorCounter == 0) { + log.warn(sm.getString("mcastServiceImpl.send.failed"), x); } else { - log.debug(sm.getString("mcastServiceImpl.send.failed"),x); + log.debug(sm.getString("mcastServiceImpl.send.failed"), x); } - if ( (++errorCounter)>=recoveryCounter ) { - errorCounter=0; + if ((++errorCounter) >= recoveryCounter) { + errorCounter = 0; RecoveryThread.recover(McastServiceImpl.this); } } @@ -636,7 +634,7 @@ public class McastServiceImpl extends MembershipProviderBase { } } } - }//class SenderThread + }// class SenderThread protected static class RecoveryThread extends Thread { @@ -664,6 +662,7 @@ public class McastServiceImpl extends MembershipProviderBase { final McastServiceImpl parent; + public RecoveryThread(McastServiceImpl parent) { this.parent = parent; } @@ -677,6 +676,7 @@ public class McastServiceImpl extends MembershipProviderBase { return false; } } + public boolean startService() { try { parent.init(); @@ -687,34 +687,34 @@ public class McastServiceImpl extends MembershipProviderBase { return false; } } + @Override public void run() { boolean success = false; int attempt = 0; try { while (!success) { - if(log.isInfoEnabled()) { + if (log.isInfoEnabled()) { log.info(sm.getString("mcastServiceImpl.recovery")); } if (stopService() & startService()) { success = true; - if(log.isInfoEnabled()) { + if (log.isInfoEnabled()) { log.info(sm.getString("mcastServiceImpl.recovery.successful")); } } try { if (!success) { - if(log.isInfoEnabled()) { - log.info(sm.getString("mcastServiceImpl.recovery.failed", - Integer.toString(++attempt), + if (log.isInfoEnabled()) { + log.info(sm.getString("mcastServiceImpl.recovery.failed", Integer.toString(++attempt), Long.toString(parent.recoverySleepTime))); } sleep(parent.recoverySleepTime); } - }catch (InterruptedException ignore) { + } catch (InterruptedException ignore) { } } - }finally { + } finally { running.set(false); } } diff --git a/java/org/apache/catalina/tribes/membership/MemberImpl.java b/java/org/apache/catalina/tribes/membership/MemberImpl.java index 761a370b0d..712231904e 100644 --- a/java/org/apache/catalina/tribes/membership/MemberImpl.java +++ b/java/org/apache/catalina/tribes/membership/MemberImpl.java @@ -28,14 +28,13 @@ import org.apache.catalina.tribes.transport.SenderState; import org.apache.catalina.tribes.util.StringManager; /** - * A <b>membership</b> implementation using simple multicast. - * This is the representation of a multicast member. - * Carries the host, and port of the this or other cluster nodes. + * A <b>membership</b> implementation using simple multicast. This is the representation of a multicast member. Carries + * the host, and port of the this or other cluster nodes. */ public class MemberImpl implements Member, java.io.Externalizable { - public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66, 1, 0}; - public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69, 1, 0}; + public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] { 84, 82, 73, 66, 69, 83, 45, 66, 1, 0 }; + public static final transient byte[] TRIBES_MBR_END = new byte[] { 84, 82, 73, 66, 69, 83, 45, 69, 1, 0 }; protected static final StringManager sm = StringManager.getManager(Constants.Package); /** @@ -63,8 +62,7 @@ public class MemberImpl implements Member, java.io.Externalizable { protected AtomicInteger msgCount = new AtomicInteger(0); /** - * The number of milliseconds since this member was - * created, is kept track of using the start time + * The number of milliseconds since this member was created, is kept track of using the start time */ protected volatile long memberAliveTime = 0; @@ -74,8 +72,7 @@ public class MemberImpl implements Member, java.io.Externalizable { protected transient long serviceStartTime; /** - * To avoid serialization over and over again, once the local dataPkg - * has been set, we use that to transmit data + * To avoid serialization over and over again, once the local dataPkg has been set, we use that to transmit data */ protected transient byte[] dataPkg = null; @@ -85,14 +82,13 @@ public class MemberImpl implements Member, java.io.Externalizable { protected volatile byte[] uniqueId = new byte[16]; /** - * Custom payload that an app framework can broadcast - * Also used to transport stop command. + * Custom payload that an app framework can broadcast Also used to transport stop command. */ protected volatile byte[] payload = new byte[0]; /** - * Command, so that the custom payload doesn't have to be used - * This is for internal tribes use, such as SHUTDOWN_COMMAND + * Command, so that the custom payload doesn't have to be used This is for internal tribes use, such as + * SHUTDOWN_COMMAND */ protected volatile byte[] command = new byte[0]; @@ -116,26 +112,20 @@ public class MemberImpl implements Member, java.io.Externalizable { /** * Construct a new member object. * - * @param host - the tcp listen host - * @param port - the tcp listen port + * @param host - the tcp listen host + * @param port - the tcp listen port * @param aliveTime - the number of milliseconds since this member was created * - * @throws IOException If there is an error converting the host name to an - * IP address + * @throws IOException If there is an error converting the host name to an IP address */ - public MemberImpl(String host, - int port, - long aliveTime) throws IOException { + public MemberImpl(String host, int port, long aliveTime) throws IOException { setHostname(host); this.port = port; - this.memberAliveTime=aliveTime; + this.memberAliveTime = aliveTime; } - public MemberImpl(String host, - int port, - long aliveTime, - byte[] payload) throws IOException { - this(host,port,aliveTime); + public MemberImpl(String host, int port, long aliveTime, byte[] payload) throws IOException { + this(host, port, aliveTime); setPayload(payload); } @@ -143,10 +133,12 @@ public class MemberImpl implements Member, java.io.Externalizable { public boolean isReady() { return SenderState.getSenderState(this).isReady(); } + @Override public boolean isSuspect() { return SenderState.getSenderState(this).isSuspect(); } + @Override public boolean isFailing() { return SenderState.getSenderState(this).isFailing(); @@ -160,44 +152,44 @@ public class MemberImpl implements Member, java.io.Externalizable { } /** - * Create a data package to send over the wire representing this member. - * This is faster than serialization. + * Create a data package to send over the wire representing this member. This is faster than serialization. + * * @return - the bytes for this member deserialized */ - public byte[] getData() { + public byte[] getData() { return getData(true); } @Override - public byte[] getData(boolean getalive) { - return getData(getalive,false); + public byte[] getData(boolean getalive) { + return getData(getalive, false); } @Override public synchronized int getDataLength() { - return TRIBES_MBR_BEGIN.length+ //start pkg - 4+ //data length - 8+ //alive time - 4+ //port - 4+ //secure port - 4+ //udp port - 1+ //host length - host.length+ //host - 4+ //command length - command.length+ //command - 4+ //domain length - domain.length+ //domain - 16+ //unique id - 4+ //payload length - payload.length+ //payload - TRIBES_MBR_END.length; //end pkg + return TRIBES_MBR_BEGIN.length + // start pkg + 4 + // data length + 8 + // alive time + 4 + // port + 4 + // secure port + 4 + // udp port + 1 + // host length + host.length + // host + 4 + // command length + command.length + // command + 4 + // domain length + domain.length + // domain + 16 + // unique id + 4 + // payload length + payload.length + // payload + TRIBES_MBR_END.length; // end pkg } @Override - public synchronized byte[] getData(boolean getalive, boolean reset) { + public synchronized byte[] getData(boolean getalive, boolean reset) { if (reset) { dataPkg = null; } @@ -214,84 +206,85 @@ public class MemberImpl implements Member, java.io.Externalizable { return dataPkg; } - //package looks like - //start package TRIBES_MBR_BEGIN.length - //package length - 4 bytes - //alive - 8 bytes - //port - 4 bytes - //secure port - 4 bytes - //udp port - 4 bytes - //host length - 1 byte - //host - hl bytes - //clen - 4 bytes - //command - clen bytes - //dlen - 4 bytes - //domain - dlen bytes - //uniqueId - 16 bytes - //payload length - 4 bytes - //payload plen bytes - //end package TRIBES_MBR_END.length - long alive=System.currentTimeMillis()-getServiceStartTime(); + // package looks like + // start package TRIBES_MBR_BEGIN.length + // package length - 4 bytes + // alive - 8 bytes + // port - 4 bytes + // secure port - 4 bytes + // udp port - 4 bytes + // host length - 1 byte + // host - hl bytes + // clen - 4 bytes + // command - clen bytes + // dlen - 4 bytes + // domain - dlen bytes + // uniqueId - 16 bytes + // payload length - 4 bytes + // payload plen bytes + // end package TRIBES_MBR_END.length + long alive = System.currentTimeMillis() - getServiceStartTime(); byte[] data = new byte[getDataLength()]; int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4); int pos = 0; - //TRIBES_MBR_BEGIN - System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length); + // TRIBES_MBR_BEGIN + System.arraycopy(TRIBES_MBR_BEGIN, 0, data, pos, TRIBES_MBR_BEGIN.length); pos += TRIBES_MBR_BEGIN.length; - //body length - XByteBuffer.toBytes(bodylength,data,pos); + // body length + XByteBuffer.toBytes(bodylength, data, pos); pos += 4; - //alive data - XByteBuffer.toBytes(alive,data,pos); + // alive data + XByteBuffer.toBytes(alive, data, pos); pos += 8; - //port - XByteBuffer.toBytes(port,data,pos); + // port + XByteBuffer.toBytes(port, data, pos); pos += 4; - //secure port - XByteBuffer.toBytes(securePort,data,pos); + // secure port + XByteBuffer.toBytes(securePort, data, pos); pos += 4; - //udp port - XByteBuffer.toBytes(udpPort,data,pos); + // udp port + XByteBuffer.toBytes(udpPort, data, pos); pos += 4; - //host length + // host length data[pos++] = (byte) host.length; - //host - System.arraycopy(host,0,data,pos,host.length); - pos+=host.length; - //clen - 4 bytes - XByteBuffer.toBytes(command.length,data,pos); - pos+=4; - //command - clen bytes - System.arraycopy(command,0,data,pos,command.length); - pos+=command.length; - //dlen - 4 bytes - XByteBuffer.toBytes(domain.length,data,pos); - pos+=4; - //domain - dlen bytes - System.arraycopy(domain,0,data,pos,domain.length); - pos+=domain.length; - //unique Id - System.arraycopy(uniqueId,0,data,pos,uniqueId.length); - pos+=uniqueId.length; - //payload - XByteBuffer.toBytes(payload.length,data,pos); - pos+=4; - System.arraycopy(payload,0,data,pos,payload.length); - pos+=payload.length; - - //TRIBES_MBR_END - System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length); + // host + System.arraycopy(host, 0, data, pos, host.length); + pos += host.length; + // clen - 4 bytes + XByteBuffer.toBytes(command.length, data, pos); + pos += 4; + // command - clen bytes + System.arraycopy(command, 0, data, pos, command.length); + pos += command.length; + // dlen - 4 bytes + XByteBuffer.toBytes(domain.length, data, pos); + pos += 4; + // domain - dlen bytes + System.arraycopy(domain, 0, data, pos, domain.length); + pos += domain.length; + // unique Id + System.arraycopy(uniqueId, 0, data, pos, uniqueId.length); + pos += uniqueId.length; + // payload + XByteBuffer.toBytes(payload.length, data, pos); + pos += 4; + System.arraycopy(payload, 0, data, pos, payload.length); + pos += payload.length; + + // TRIBES_MBR_END + System.arraycopy(TRIBES_MBR_END, 0, data, pos, TRIBES_MBR_END.length); pos += TRIBES_MBR_END.length; - //create local data + // create local data dataPkg = data; return data; } + /** * Deserializes a member from data sent over the wire. * @@ -301,50 +294,52 @@ public class MemberImpl implements Member, java.io.Externalizable { * @return The populated member object. */ public static Member getMember(byte[] data, MemberImpl member) { - return getMember(data,0,data.length,member); + return getMember(data, 0, data.length, member); } public static Member getMember(byte[] data, int offset, int length, MemberImpl member) { - //package looks like - //start package TRIBES_MBR_BEGIN.length - //package length - 4 bytes - //alive - 8 bytes - //port - 4 bytes - //secure port - 4 bytes - //udp port - 4 bytes - //host length - 1 byte - //host - hl bytes - //clen - 4 bytes - //command - clen bytes - //dlen - 4 bytes - //domain - dlen bytes - //uniqueId - 16 bytes - //payload length - 4 bytes - //payload plen bytes - //end package TRIBES_MBR_END.length + // package looks like + // start package TRIBES_MBR_BEGIN.length + // package length - 4 bytes + // alive - 8 bytes + // port - 4 bytes + // secure port - 4 bytes + // udp port - 4 bytes + // host length - 1 byte + // host - hl bytes + // clen - 4 bytes + // command - clen bytes + // dlen - 4 bytes + // domain - dlen bytes + // uniqueId - 16 bytes + // payload length - 4 bytes + // payload plen bytes + // end package TRIBES_MBR_END.length int pos = offset; - if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) { - throw new IllegalArgumentException(sm.getString("memberImpl.invalid.package.begin", org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN))); + if (XByteBuffer.firstIndexOf(data, offset, TRIBES_MBR_BEGIN) != pos) { + throw new IllegalArgumentException(sm.getString("memberImpl.invalid.package.begin", + org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN))); } - if ( length < (TRIBES_MBR_BEGIN.length+4) ) { + if (length < (TRIBES_MBR_BEGIN.length + 4)) { throw new ArrayIndexOutOfBoundsException(sm.getString("memberImpl.package.small")); } pos += TRIBES_MBR_BEGIN.length; - int bodylength = XByteBuffer.toInt(data,pos); + int bodylength = XByteBuffer.toInt(data, pos); pos += 4; - if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) { + if (length < (bodylength + 4 + TRIBES_MBR_BEGIN.length + TRIBES_MBR_END.length)) { throw new ArrayIndexOutOfBoundsException(sm.getString("memberImpl.notEnough.bytes")); } - int endpos = pos+bodylength; - if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) { - throw new IllegalArgumentException(sm.getString("memberImpl.invalid.package.end", org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END))); + int endpos = pos + bodylength; + if (XByteBuffer.firstIndexOf(data, endpos, TRIBES_MBR_END) != endpos) { + throw new IllegalArgumentException(sm.getString("memberImpl.invalid.package.end", + org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END))); } @@ -413,25 +408,25 @@ public class MemberImpl implements Member, java.io.Externalizable { } public static Member getMember(byte[] data) { - return getMember(data,new MemberImpl()); + return getMember(data, new MemberImpl()); } public static Member getMember(byte[] data, int offset, int length) { - return getMember(data,offset,length,new MemberImpl()); + return getMember(data, offset, length, new MemberImpl()); } @Override public String getName() { - return "tcp://"+getHostname()+":"+getPort(); + return "tcp://" + getHostname() + ":" + getPort(); } @Override - public int getPort() { + public int getPort() { return this.port; } @Override - public byte[] getHost() { + public byte[] getHost() { return host; } @@ -451,7 +446,7 @@ public class MemberImpl implements Member, java.io.Externalizable { @Override public long getMemberAliveTime() { - return memberAliveTime; + return memberAliveTime; } public long getServiceStartTime() { @@ -490,13 +485,12 @@ public class MemberImpl implements Member, java.io.Externalizable { @Override public void setMemberAliveTime(long time) { - memberAliveTime=time; + memberAliveTime = time; } - @Override - public String toString() { + public String toString() { StringBuilder buf = new StringBuilder(getClass().getName()); buf.append('['); buf.append(getName()).append(','); @@ -506,22 +500,24 @@ public class MemberImpl implements Member, java.io.Externalizable { buf.append("securePort=").append(securePort).append(", "); buf.append("UDP Port=").append(udpPort).append(", "); buf.append("id=").append(bToS(this.uniqueId)).append(", "); - buf.append("payload=").append(bToS(this.payload,8)).append(", "); - buf.append("command=").append(bToS(this.command,8)).append(", "); - buf.append("domain=").append(bToS(this.domain,8)); + buf.append("payload=").append(bToS(this.payload, 8)).append(", "); + buf.append("command=").append(bToS(this.command, 8)).append(", "); + buf.append("domain=").append(bToS(this.domain, 8)); buf.append(']'); return buf.toString(); } + public static String bToS(byte[] data) { - return bToS(data,data.length); + return bToS(data, data.length); } + public static String bToS(byte[] data, int max) { - StringBuilder buf = new StringBuilder(4*16); + StringBuilder buf = new StringBuilder(4 * 16); buf.append('{'); - for (int i=0; data!=null && i<data.length; i++ ) { + for (int i = 0; data != null && i < data.length; i++) { buf.append(String.valueOf(data[i])).append(' '); - if ( i==max ) { - buf.append("...("+data.length+")"); + if (i == max) { + buf.append("...(" + data.length + ")"); break; } } @@ -531,7 +527,7 @@ public class MemberImpl implements Member, java.io.Externalizable { @Override public int hashCode() { - return getHost()[0]+getHost()[1]+getHost()[2]+getHost()[3]; + return getHost()[0] + getHost()[1] + getHost()[2] + getHost()[3]; } /** @@ -541,10 +537,10 @@ public class MemberImpl implements Member, java.io.Externalizable { */ @Override public boolean equals(Object o) { - if ( o instanceof MemberImpl ) { - return Arrays.equals(this.getHost(),((MemberImpl)o).getHost()) && - this.getPort() == ((MemberImpl)o).getPort() && - Arrays.equals(this.getUniqueId(),((MemberImpl)o).getUniqueId()); + if (o instanceof MemberImpl) { + return Arrays.equals(this.getHost(), ((MemberImpl) o).getHost()) && + this.getPort() == ((MemberImpl) o).getPort() && + Arrays.equals(this.getUniqueId(), ((MemberImpl) o).getUniqueId()); } else { return false; } @@ -575,8 +571,8 @@ public class MemberImpl implements Member, java.io.Externalizable { } public synchronized void setUniqueId(byte[] uniqueId) { - this.uniqueId = uniqueId!=null?uniqueId:new byte[16]; - getData(true,true); + this.uniqueId = uniqueId != null ? uniqueId : new byte[16]; + getData(true, true); } @Override @@ -589,8 +585,8 @@ public class MemberImpl implements Member, java.io.Externalizable { } if (newPayloadLength > oldPayloadLength) { // It is possible that the max packet size will be exceeded - if ((newPayloadLength - oldPayloadLength + getData(false, false).length) > - McastServiceImpl.MAX_PACKET_SIZE) { + if ((newPayloadLength - oldPayloadLength + + getData(false, false).length) > McastServiceImpl.MAX_PACKET_SIZE) { throw new IllegalArgumentException(sm.getString("memberImpl.large.payload")); } } @@ -600,13 +596,13 @@ public class MemberImpl implements Member, java.io.Externalizable { @Override public synchronized void setCommand(byte[] command) { - this.command = command!=null?command:new byte[0]; - getData(true,true); + this.command = command != null ? command : new byte[0]; + getData(true, true); } public synchronized void setDomain(byte[] domain) { - this.domain = domain!=null?domain:new byte[0]; - getData(true,true); + this.domain = domain != null ? domain : new byte[0]; + getData(true, true); } public synchronized void setSecurePort(int securePort) { @@ -634,7 +630,7 @@ public class MemberImpl implements Member, java.io.Externalizable { int length = in.readInt(); byte[] message = new byte[length]; in.readFully(message); - getMember(message,this); + getMember(message, this); } diff --git a/java/org/apache/catalina/tribes/membership/Membership.java b/java/org/apache/catalina/tribes/membership/Membership.java index 9aaffaff54..0e735ebe78 100644 --- a/java/org/apache/catalina/tribes/membership/Membership.java +++ b/java/org/apache/catalina/tribes/membership/Membership.java @@ -24,10 +24,9 @@ import java.util.HashMap; import org.apache.catalina.tribes.Member; /** - * A <b>membership</b> implementation using simple multicast. - * This is the representation of a multicast membership. - * This class is responsible for maintaining a list of active cluster nodes in the cluster. - * If a node fails to send out a heartbeat, the node will be dismissed. + * A <b>membership</b> implementation using simple multicast. This is the representation of a multicast membership. This + * class is responsible for maintaining a list of active cluster nodes in the cluster. If a node fails to send out a + * heartbeat, the node will be dismissed. * * @author Peter Rossbach */ @@ -46,7 +45,7 @@ public class Membership implements Cloneable { /** * A map of all the members in the cluster. */ - protected HashMap<Member, MbrEntry> map = new HashMap<>(); // Guarded by membersLock + protected HashMap<Member,MbrEntry> map = new HashMap<>(); // Guarded by membersLock /** * A list of all the members in the cluster. @@ -72,7 +71,7 @@ public class Membership implements Cloneable { // Standard clone() method will copy the map object. Replace that // with a new map but with the same contents. @SuppressWarnings("unchecked") - final HashMap<Member, MbrEntry> tmpclone = (HashMap<Member, MbrEntry>) map.clone(); + final HashMap<Member,MbrEntry> tmpclone = (HashMap<Member,MbrEntry>) map.clone(); clone.map = tmpclone; // Standard clone() method will copy the array object. Replace that @@ -88,7 +87,9 @@ public class Membership implements Cloneable { /** * Constructs a new membership - * @param local - has to be the name of the local member. Used to filter the local member from the cluster membership + * + * @param local - has to be the name of the local member. Used to filter the local member from the cluster + * membership * @param includeLocal - TBA */ public Membership(Member local, boolean includeLocal) { @@ -112,13 +113,13 @@ public class Membership implements Cloneable { } /** - * Reset the membership and start over fresh. i.e., delete all the members - * and wait for them to ping again and join this membership. + * Reset the membership and start over fresh. i.e., delete all the members and wait for them to ping again and join + * this membership. */ public void reset() { synchronized (membersLock) { map.clear(); - members = EMPTY_MEMBERS ; + members = EMPTY_MEMBERS; } } @@ -126,8 +127,9 @@ public class Membership implements Cloneable { * Notify the membership that this member has announced itself. * * @param member - the member that just pinged us + * * @return - true if this member is new to the cluster, false otherwise.<br> - * - false if this member is the local member or updated. + * - false if this member is the local member or updated. */ public boolean memberAlive(Member member) { // Ignore ourselves @@ -171,7 +173,7 @@ public class Membership implements Cloneable { public MbrEntry addMember(Member member) { MbrEntry entry = new MbrEntry(member); synchronized (membersLock) { - if (!map.containsKey(member) ) { + if (!map.containsKey(member)) { map.put(member, entry); Member results[] = new Member[members.length + 1]; System.arraycopy(members, 0, results, 0, members.length); @@ -213,16 +215,17 @@ public class Membership implements Cloneable { } /** - * Runs a refresh cycle and returns a list of members that has expired. - * This also removes the members from the membership, in such a way that - * getMembers() = getMembers() - expire() + * Runs a refresh cycle and returns a list of members that has expired. This also removes the members from the + * membership, in such a way that getMembers() = getMembers() - expire() + * * @param maxtime - the max time a member can remain unannounced before it is considered dead. + * * @return the list of expired members */ public Member[] expire(long maxtime) { synchronized (membersLock) { if (!hasMembers()) { - return EMPTY_MEMBERS; + return EMPTY_MEMBERS; } ArrayList<Member> list = null; @@ -243,7 +246,7 @@ public class Membership implements Cloneable { } return result; } else { - return EMPTY_MEMBERS ; + return EMPTY_MEMBERS; } } } @@ -251,8 +254,7 @@ public class Membership implements Cloneable { /** * Returning that service has members or not. * - * @return <code>true</code> if there are one or more members, otherwise - * <code>false</code> + * @return <code>true</code> if there are one or more members, otherwise <code>false</code> */ public boolean hasMembers() { return members.length > 0; @@ -276,8 +278,7 @@ public class Membership implements Cloneable { } /** - * Returning a list of all the members in the membership. - * We not need a copy: add and remove generate new arrays. + * Returning a list of all the members in the membership. We not need a copy: add and remove generate new arrays. * * @return An array of the current members */ @@ -297,14 +298,14 @@ public class Membership implements Cloneable { protected long lastHeardFrom; public MbrEntry(Member mbr) { - this.mbr = mbr; + this.mbr = mbr; } /** * Indicate that this member has been accessed. */ - public void accessed(){ - lastHeardFrom = System.currentTimeMillis(); + public void accessed() { + lastHeardFrom = System.currentTimeMillis(); } /** @@ -321,8 +322,7 @@ public class Membership implements Cloneable { * * @param maxtime The time threshold * - * @return <code>true</code> if the member has expired, otherwise - * <code>false</code> + * @return <code>true</code> if the member has expired, otherwise <code>false</code> */ public boolean hasExpired(long maxtime) { return !mbr.isLocal() && (System.currentTimeMillis() - lastHeardFrom) > maxtime; diff --git a/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java b/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java index 8c3b93e85a..d77f495417 100644 --- a/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java +++ b/java/org/apache/catalina/tribes/membership/MembershipProviderBase.java @@ -38,7 +38,7 @@ public abstract class MembershipProviderBase implements MembershipProvider { @Override public boolean hasMembers() { - if (membership == null ) { + if (membership == null) { return false; } return membership.hasMembers(); diff --git a/java/org/apache/catalina/tribes/membership/MembershipServiceBase.java b/java/org/apache/catalina/tribes/membership/MembershipServiceBase.java index b05b9cc4f6..fd7093e4a9 100644 --- a/java/org/apache/catalina/tribes/membership/MembershipServiceBase.java +++ b/java/org/apache/catalina/tribes/membership/MembershipServiceBase.java @@ -46,7 +46,7 @@ public abstract class MembershipServiceBase implements MembershipService, Member @Override public boolean hasMembers() { - if (getMembershipProvider() == null ) { + if (getMembershipProvider() == null) { return false; } return getMembershipProvider().hasMembers(); @@ -71,16 +71,16 @@ public abstract class MembershipServiceBase implements MembershipService, Member @Override public String[] getMembersByName() { Member[] currentMembers = getMembers(); - String [] membernames ; - if(currentMembers != null) { + String[] membernames; + if (currentMembers != null) { membernames = new String[currentMembers.length]; for (int i = 0; i < currentMembers.length; i++) { - membernames[i] = currentMembers[i].toString() ; + membernames[i] = currentMembers[i].toString(); } } else { - membernames = new String[0] ; + membernames = new String[0]; } - return membernames ; + return membernames; } @Override @@ -100,7 +100,7 @@ public abstract class MembershipServiceBase implements MembershipService, Member } @Override - public void removeMembershipListener(){ + public void removeMembershipListener() { listener = null; } diff --git a/java/org/apache/catalina/tribes/membership/StaticMember.java b/java/org/apache/catalina/tribes/membership/StaticMember.java index 39159f9b6d..da2c2f7a53 100644 --- a/java/org/apache/catalina/tribes/membership/StaticMember.java +++ b/java/org/apache/catalina/tribes/membership/StaticMember.java @@ -34,30 +34,34 @@ public class StaticMember extends MemberImpl { } /** - * @param host String, either in byte array string format, like {214,116,1,3} - * or as a regular hostname, 127.0.0.1 or tomcat01.mydomain.com + * @param host String, either in byte array string format, like {214,116,1,3} or as a regular hostname, 127.0.0.1 or + * tomcat01.mydomain.com */ public void setHost(String host) { - if ( host == null ) { + if (host == null) { return; } - if ( host.startsWith("{") ) { + if (host.startsWith("{")) { setHost(Arrays.fromString(host)); } else { - try { setHostname(host); }catch (IOException x) { throw new RuntimeException(x);} + try { + setHostname(host); + } catch (IOException x) { + throw new RuntimeException(x); + } } } /** - * @param domain String, either in byte array string format, like {214,116,1,3} - * or as a regular string value like 'mydomain'. The latter will be converted using ISO-8859-1 encoding + * @param domain String, either in byte array string format, like {214,116,1,3} or as a regular string value like + * 'mydomain'. The latter will be converted using ISO-8859-1 encoding */ public void setDomain(String domain) { - if ( domain == null ) { + if (domain == null) { return; } - if ( domain.startsWith("{") ) { + if (domain.startsWith("{")) { setDomain(Arrays.fromString(domain)); } else { setDomain(Arrays.convert(domain)); @@ -69,7 +73,7 @@ public class StaticMember extends MemberImpl { */ public void setUniqueId(String id) { byte[] uuid = Arrays.fromString(id); - if ( uuid==null || uuid.length != 16 ) { + if (uuid == null || uuid.length != 16) { throw new RuntimeException(sm.getString("staticMember.invalid.uuidLength", id)); } setUniqueId(uuid); diff --git a/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java b/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java index a4406b94c6..32ab1fe392 100644 --- a/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java +++ b/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java @@ -40,7 +40,8 @@ import org.apache.catalina.tribes.util.StringManager; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; -public class StaticMembershipProvider extends MembershipProviderBase implements RpcCallback, ChannelListener, Heartbeat { +public class StaticMembershipProvider extends MembershipProviderBase + implements RpcCallback, ChannelListener, Heartbeat { protected static final StringManager sm = StringManager.getManager(StaticMembershipProvider.class); private static final Log log = LogFactory.getLog(StaticMembershipProvider.class); @@ -82,20 +83,20 @@ public class StaticMembershipProvider extends MembershipProviderBase implements @Override public void start(int level) throws Exception { - if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) { - //no-op + if (Channel.MBR_RX_SEQ == (level & Channel.MBR_RX_SEQ)) { + // no-op } - if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) { - //no-op + if (Channel.MBR_TX_SEQ == (level & Channel.MBR_TX_SEQ)) { + // no-op } startLevel = (startLevel | level); if (startLevel == (Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ)) { startMembership(getAliveMembers(staticMembers.toArray(new Member[0]))); running = true; - if ( thread == null && useThread) { + if (thread == null && useThread) { thread = new PingThread(); thread.setDaemon(true); - thread.setName("StaticMembership.PingThread[" + this.channel.getName() +"]"); + thread.setName("StaticMembership.PingThread[" + this.channel.getName() + "]"); thread.start(); } } @@ -103,14 +104,14 @@ public class StaticMembershipProvider extends MembershipProviderBase implements @Override public boolean stop(int level) throws Exception { - if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) { + if (Channel.MBR_RX_SEQ == (level & Channel.MBR_RX_SEQ)) { // no-op } - if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) { + if (Channel.MBR_TX_SEQ == (level & Channel.MBR_TX_SEQ)) { // no-op } startLevel = (startLevel & (~level)); - if ( startLevel == 0 ) { + if (startLevel == 0) { running = false; if (thread != null) { thread.interrupt(); @@ -157,7 +158,7 @@ public class StaticMembershipProvider extends MembershipProviderBase implements protected void memberAdded(Member member) { Member mbr = setupMember(member); - if(membership.memberAlive(mbr)) { + if (membership.memberAlive(mbr)) { Runnable r = () -> { Thread currentThread = Thread.currentThread(); String name = currentThread.getName(); @@ -195,7 +196,7 @@ public class StaticMembershipProvider extends MembershipProviderBase implements } protected void stopMembership(Member[] members) { - if (members.length == 0 ) { + if (members.length == 0) { return; } Member localmember = service.getLocalMember(false); @@ -247,8 +248,7 @@ public class StaticMembershipProvider extends MembershipProviderBase implements } else { // other messages are ignored. if (log.isInfoEnabled()) { - log.info(sm.getString("staticMembershipProvider.replyRequest.ignored", - memMsg.getTypeDesc())); + log.info(sm.getString("staticMembershipProvider.replyRequest.ignored", memMsg.getTypeDesc())); } return null; } @@ -267,8 +267,7 @@ public class StaticMembershipProvider extends MembershipProviderBase implements } else { // other messages are ignored. if (log.isInfoEnabled()) { - log.info(sm.getString("staticMembershipProvider.leftOver.ignored", - memMsg.getTypeDesc())); + log.info(sm.getString("staticMembershipProvider.leftOver.ignored", memMsg.getTypeDesc())); } } } @@ -289,7 +288,8 @@ public class StaticMembershipProvider extends MembershipProviderBase implements Member[] members = getAliveMembers(staticMembers.toArray(new Member[0])); if (members.length > 0) { try { - MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_PING, service.getLocalMember(true)); + MemberMessage msg = + new MemberMessage(membershipId, MemberMessage.MSG_PING, service.getLocalMember(true)); Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout); for (Response response : resp) { messageReceived(response.getMessage(), response.getSource()); @@ -387,14 +387,14 @@ public class StaticMembershipProvider extends MembershipProviderBase implements protected String getTypeDesc() { switch (msgtype) { - case MSG_START: - return "MSG_START"; - case MSG_STOP: - return "MSG_STOP"; - case MSG_PING: - return "MSG_PING"; - default: - return "UNKNOWN"; + case MSG_START: + return "MSG_START"; + case MSG_STOP: + return "MSG_STOP"; + case MSG_PING: + return "MSG_PING"; + default: + return "UNKNOWN"; } } } @@ -406,9 +406,9 @@ public class StaticMembershipProvider extends MembershipProviderBase implements try { sleep(pingInterval); ping(); - }catch (InterruptedException ix) { - }catch (Exception x) { - log.warn(sm.getString("staticMembershipProvider.pingThread.failed"),x); + } catch (InterruptedException ix) { + } catch (Exception x) { + log.warn(sm.getString("staticMembershipProvider.pingThread.failed"), x); } } } diff --git a/java/org/apache/catalina/tribes/membership/StaticMembershipService.java b/java/org/apache/catalina/tribes/membership/StaticMembershipService.java index b82a05bfe0..3e00aa64c3 100644 --- a/java/org/apache/catalina/tribes/membership/StaticMembershipService.java +++ b/java/org/apache/catalina/tribes/membership/StaticMembershipService.java @@ -31,8 +31,7 @@ import org.apache.catalina.tribes.util.StringManager; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; -public class StaticMembershipService extends MembershipServiceBase - implements StaticMembershipServiceMBean { +public class StaticMembershipService extends MembershipServiceBase implements StaticMembershipServiceMBean { private static final Log log = LogFactory.getLog(StaticMembershipService.class); protected static final StringManager sm = StringManager.getManager(Constants.Package); @@ -47,7 +46,7 @@ public class StaticMembershipService extends MembershipServiceBase private ObjectName oname = null; public StaticMembershipService() { - //default values + // default values setDefaults(this.properties); } @@ -99,15 +98,14 @@ public class StaticMembershipService extends MembershipServiceBase @Override public Member getLocalMember(boolean incAliveTime) { - if ( incAliveTime && localMember != null) { - localMember.setMemberAliveTime(System.currentTimeMillis()-localMember.getServiceStartTime()); + if (incAliveTime && localMember != null) { + localMember.setMemberAliveTime(System.currentTimeMillis() - localMember.getServiceStartTime()); } return localMember; } @Override - public void setLocalMemberProperties(String listenHost, int listenPort, - int securePort, int udpPort) { + public void setLocalMemberProperties(String listenHost, int listenPort, int securePort, int udpPort) { properties.setProperty("tcpListenHost", listenHost); properties.setProperty("tcpListenPort", String.valueOf(listenPort)); try { @@ -213,24 +211,24 @@ public class StaticMembershipService extends MembershipServiceBase protected void setDefaults(Properties properties) { // default values if (properties.getProperty("expirationTime") == null) { - properties.setProperty("expirationTime","5000"); + properties.setProperty("expirationTime", "5000"); } if (properties.getProperty("connectTimeout") == null) { - properties.setProperty("connectTimeout","500"); + properties.setProperty("connectTimeout", "500"); } if (properties.getProperty("rpcTimeout") == null) { - properties.setProperty("rpcTimeout","3000"); + properties.setProperty("rpcTimeout", "3000"); } if (properties.getProperty("useThread") == null) { - properties.setProperty("useThread","false"); + properties.setProperty("useThread", "false"); } if (properties.getProperty("pingInterval") == null) { - properties.setProperty("pingInterval","1000"); + properties.setProperty("pingInterval", "1000"); } } private String getMembershipName() { - return channel.getName()+"-"+"StaticMembership"; + return channel.getName() + "-" + "StaticMembership"; } private void findLocalMember() throws IOException { @@ -242,8 +240,8 @@ public class StaticMembershipService extends MembershipServiceBase // find local member from static members for (StaticMember staticMember : this.staticMembers) { - if (Arrays.equals(InetAddress.getByName(listenHost).getAddress(), staticMember.getHost()) - && Integer.parseInt(listenPort) == staticMember.getPort()) { + if (Arrays.equals(InetAddress.getByName(listenHost).getAddress(), staticMember.getHost()) && + Integer.parseInt(listenPort) == staticMember.getPort()) { this.localMember = staticMember; break; } diff --git a/java/org/apache/catalina/tribes/membership/cloud/AbstractStreamProvider.java b/java/org/apache/catalina/tribes/membership/cloud/AbstractStreamProvider.java index 68785e3243..5f99a22800 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/AbstractStreamProvider.java +++ b/java/org/apache/catalina/tribes/membership/cloud/AbstractStreamProvider.java @@ -46,18 +46,20 @@ public abstract class AbstractStreamProvider implements StreamProvider { private static final Log log = LogFactory.getLog(AbstractStreamProvider.class); protected static final StringManager sm = StringManager.getManager(AbstractStreamProvider.class); - protected static final TrustManager[] INSECURE_TRUST_MANAGERS = new TrustManager[] { - new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {} - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {} - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - } - }; + protected static final TrustManager[] INSECURE_TRUST_MANAGERS = new TrustManager[] { new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } }; /** * @return the socket factory, or null if not needed @@ -66,17 +68,21 @@ public abstract class AbstractStreamProvider implements StreamProvider { /** * Open URL connection to the specified URL. - * @param url the url - * @param headers the headers map + * + * @param url the url + * @param headers the headers map * @param connectTimeout connection timeout in ms - * @param readTimeout read timeout in ms + * @param readTimeout read timeout in ms + * * @return the URL connection + * * @throws IOException when an error occurs */ - public URLConnection openConnection(String url, Map<String, String> headers, int connectTimeout, int readTimeout) throws IOException { + public URLConnection openConnection(String url, Map<String,String> headers, int connectTimeout, int readTimeout) + throws IOException { if (log.isDebugEnabled()) { - log.debug(sm.getString("abstractStream.connection", - getClass().getSimpleName(), url, headers, Integer.toString(connectTimeout), Integer.toString(readTimeout))); + log.debug(sm.getString("abstractStream.connection", getClass().getSimpleName(), url, headers, + Integer.toString(connectTimeout), Integer.toString(readTimeout))); } URLConnection connection; try { @@ -86,7 +92,7 @@ public abstract class AbstractStreamProvider implements StreamProvider { throw new IOException(e); } if (headers != null) { - for (Map.Entry<String, String> entry : headers.entrySet()) { + for (Map.Entry<String,String> entry : headers.entrySet()) { connection.addRequestProperty(entry.getKey(), entry.getValue()); } } @@ -100,13 +106,14 @@ public abstract class AbstractStreamProvider implements StreamProvider { } @Override - public InputStream openStream(String url, Map<String, String> headers, - int connectTimeout, int readTimeout) throws IOException { + public InputStream openStream(String url, Map<String,String> headers, int connectTimeout, int readTimeout) + throws IOException { URLConnection connection = openConnection(url, headers, connectTimeout, readTimeout); if (connection instanceof HttpsURLConnection) { ((HttpsURLConnection) connection).setSSLSocketFactory(getSocketFactory()); if (log.isTraceEnabled()) { - log.trace(String.format("Using HttpsURLConnection with SSLSocketFactory [%s] for url [%s].", getSocketFactory(), url)); + log.trace(String.format("Using HttpsURLConnection with SSLSocketFactory [%s] for url [%s].", + getSocketFactory(), url)); } } else { if (log.isTraceEnabled()) { @@ -131,7 +138,8 @@ public abstract class AbstractStreamProvider implements StreamProvider { trustStore.setCertificateEntry(alias, cert); } - TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + TrustManagerFactory trustManagerFactory = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(trustStore); return trustManagerFactory.getTrustManagers(); diff --git a/java/org/apache/catalina/tribes/membership/cloud/CertificateStreamProvider.java b/java/org/apache/catalina/tribes/membership/cloud/CertificateStreamProvider.java index c5777d1d73..8ee9396af2 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/CertificateStreamProvider.java +++ b/java/org/apache/catalina/tribes/membership/cloud/CertificateStreamProvider.java @@ -41,7 +41,8 @@ public class CertificateStreamProvider extends AbstractStreamProvider { private final SSLSocketFactory factory; - CertificateStreamProvider(String clientCertFile, String clientKeyFile, String clientKeyPassword, String clientKeyAlgo, String caCertFile) throws Exception { + CertificateStreamProvider(String clientCertFile, String clientKeyFile, String clientKeyPassword, + String clientKeyAlgo, String caCertFile) throws Exception { char[] password = (clientKeyPassword != null) ? clientKeyPassword.toCharArray() : new char[0]; KeyManager[] keyManagers = configureClientCert(clientCertFile, clientKeyFile, password, clientKeyAlgo); TrustManager[] trustManagers = configureCaCert(caCertFile); @@ -55,21 +56,23 @@ public class CertificateStreamProvider extends AbstractStreamProvider { return factory; } - private static KeyManager[] configureClientCert(String clientCertFile, String clientKeyFile, char[] clientKeyPassword, String clientKeyAlgo) throws Exception { + private static KeyManager[] configureClientCert(String clientCertFile, String clientKeyFile, + char[] clientKeyPassword, String clientKeyAlgo) throws Exception { try (InputStream certInputStream = new FileInputStream(clientCertFile)) { CertificateFactory certFactory = CertificateFactory.getInstance("X509"); - X509Certificate cert = (X509Certificate)certFactory.generateCertificate(certInputStream); + X509Certificate cert = (X509Certificate) certFactory.generateCertificate(certInputStream); PEMFile pemFile = new PEMFile(clientKeyFile, new String(clientKeyPassword), clientKeyAlgo); PrivateKey privKey = pemFile.getPrivateKey(); KeyStore keyStore = KeyStore.getInstance("JKS"); - keyStore.load(null, null); + keyStore.load(null, null); String alias = cert.getSubjectX500Principal().getName(); - keyStore.setKeyEntry(alias, privKey, clientKeyPassword, new Certificate[]{cert}); + keyStore.setKeyEntry(alias, privKey, clientKeyPassword, new Certificate[] { cert }); - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyManagerFactory keyManagerFactory = + KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); keyManagerFactory.init(keyStore, clientKeyPassword); return keyManagerFactory.getKeyManagers(); diff --git a/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipProvider.java b/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipProvider.java index fe5e193583..45fb27dd6f 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipProvider.java +++ b/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipProvider.java @@ -51,7 +51,7 @@ public abstract class CloudMembershipProvider extends MembershipProviderBase imp protected Instant startTime; protected MessageDigest md5; - protected Map<String, String> headers = new HashMap<>(); + protected Map<String,String> headers = new HashMap<>(); protected String localIp; protected int port; @@ -68,7 +68,9 @@ public abstract class CloudMembershipProvider extends MembershipProviderBase imp /** * Get value of environment variable. + * * @param keys the environment variables + * * @return the env variables values, or null if not found */ protected static String getEnv(String... keys) { @@ -83,8 +85,9 @@ public abstract class CloudMembershipProvider extends MembershipProviderBase imp } /** - * Get the Kubernetes namespace, or "tomcat" if the Kubernetes environment variable - * cannot be found (with a warning log about the missing namespace). + * Get the Kubernetes namespace, or "tomcat" if the Kubernetes environment variable cannot be found (with a warning + * log about the missing namespace). + * * @return the namespace */ protected String getNamespace() { @@ -138,22 +141,24 @@ public abstract class CloudMembershipProvider extends MembershipProviderBase imp /** * Fetch current cluster members from the cloud orchestration. + * * @return the member array */ protected abstract Member[] fetchMembers(); /** * Add or remove specified member. + * * @param member the member to add - * @param add true if the member is added, false otherwise + * @param add true if the member is added, false otherwise */ protected void updateMember(Member member, boolean add) { if (add && !membership.memberAlive(member)) { return; } if (log.isDebugEnabled()) { - String message = add ? sm.getString("cloudMembershipProvider.add", member) - : sm.getString("cloudMembershipProvider.remove", member); + String message = add ? sm.getString("cloudMembershipProvider.add", member) : + sm.getString("cloudMembershipProvider.remove", member); log.debug(message); } Runnable r = () -> { diff --git a/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipService.java b/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipService.java index 19c5e72362..764bf21f76 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipService.java +++ b/java/org/apache/catalina/tribes/membership/cloud/CloudMembershipService.java @@ -30,17 +30,16 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** - * A {@link org.apache.catalina.tribes.MembershipService} that uses Kubernetes API(default) or DNS to retrieve - * the members of a cluster.<br> + * A {@link org.apache.catalina.tribes.MembershipService} that uses Kubernetes API(default) or DNS to retrieve the + * members of a cluster.<br> * <p> - * The default implementation of the MembershipProvider component is the {@link KubernetesMembershipProvider}. - * The MembershipProvider can be configured by the <code>membershipProviderClassName</code> property. - * Possible shortcuts are {@code kubernetes} and {@code dns}. For dns look at the {@link DNSMembershipProvider}. + * The default implementation of the MembershipProvider component is the {@link KubernetesMembershipProvider}. The + * MembershipProvider can be configured by the <code>membershipProviderClassName</code> property. Possible shortcuts are + * {@code kubernetes} and {@code dns}. For dns look at the {@link DNSMembershipProvider}. * </p> * <p> * <strong>Configuration example</strong> * </p> - * * {@code server.xml } * * <pre> @@ -61,11 +60,9 @@ import org.apache.juli.logging.LogFactory; * ... * } * </pre> - * */ -public class CloudMembershipService extends MembershipServiceBase - implements CloudMembershipServiceMBean { +public class CloudMembershipService extends MembershipServiceBase implements CloudMembershipServiceMBean { private static final Log log = LogFactory.getLog(CloudMembershipService.class); protected static final StringManager sm = StringManager.getManager(CloudMembershipService.class); @@ -73,8 +70,10 @@ public class CloudMembershipService extends MembershipServiceBase public static final String MEMBERSHIP_PROVIDER_CLASS_NAME = "membershipProviderClassName"; private static final String KUBE = "kubernetes"; private static final String DNS = "dns"; - private static final String KUBE_PROVIDER_CLASS = "org.apache.catalina.tribes.membership.cloud.KubernetesMembershipProvider"; - private static final String DNS_PROVIDER_CLASS = "org.apache.catalina.tribes.membership.cloud.DNSMembershipProvider"; + private static final String KUBE_PROVIDER_CLASS = + "org.apache.catalina.tribes.membership.cloud.KubernetesMembershipProvider"; + private static final String DNS_PROVIDER_CLASS = + "org.apache.catalina.tribes.membership.cloud.DNSMembershipProvider"; protected static final byte[] INITIAL_ID = new byte[16]; private MembershipProvider membershipProvider; @@ -87,7 +86,9 @@ public class CloudMembershipService extends MembershipServiceBase /** * Return a property. + * * @param name the property name + * * @return the property value */ public Object getProperty(String name) { @@ -96,8 +97,10 @@ public class CloudMembershipService extends MembershipServiceBase /** * Set a property. - * @param name the property name + * + * @param name the property name * @param value the property value + * * @return <code>true</code> if the property was successfully set */ public boolean setProperty(String name, String value) { @@ -106,6 +109,7 @@ public class CloudMembershipService extends MembershipServiceBase /** * Return the membership provider class. + * * @return the classname */ public String getMembershipProviderClassName() { @@ -114,6 +118,7 @@ public class CloudMembershipService extends MembershipServiceBase /** * Set the membership provider class. + * * @param membershipProviderClassName the class name */ public void setMembershipProviderClassName(String membershipProviderClassName) { @@ -142,8 +147,7 @@ public class CloudMembershipService extends MembershipServiceBase if (log.isTraceEnabled()) { log.trace("Using membershipProvider: " + provider); } - membershipProvider = - (MembershipProvider) Class.forName(provider).getConstructor().newInstance(); + membershipProvider = (MembershipProvider) Class.forName(provider).getConstructor().newInstance(); membershipProvider.setMembershipListener(this); membershipProvider.setMembershipService(this); membershipProvider.init(properties); @@ -183,8 +187,8 @@ public class CloudMembershipService extends MembershipServiceBase @Override public void setLocalMemberProperties(String listenHost, int listenPort, int securePort, int udpPort) { if (log.isTraceEnabled()) { - log.trace(String.format("setLocalMemberProperties(%s, %d, %d, %d)", listenHost, - Integer.valueOf(listenPort), Integer.valueOf(securePort), Integer.valueOf(udpPort))); + log.trace(String.format("setLocalMemberProperties(%s, %d, %d, %d)", listenHost, Integer.valueOf(listenPort), + Integer.valueOf(securePort), Integer.valueOf(udpPort))); } properties.setProperty("tcpListenHost", listenHost); properties.setProperty("tcpListenPort", String.valueOf(listenPort)); diff --git a/java/org/apache/catalina/tribes/membership/cloud/DNSMembershipProvider.java b/java/org/apache/catalina/tribes/membership/cloud/DNSMembershipProvider.java index 9822487cb1..c141d78529 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/DNSMembershipProvider.java +++ b/java/org/apache/catalina/tribes/membership/cloud/DNSMembershipProvider.java @@ -33,11 +33,9 @@ import org.apache.juli.logging.LogFactory; /** * A {@link org.apache.catalina.tribes.MembershipProvider} that uses DNS to retrieve the members of a cluster.<br> - * * <p> * <strong>Configuration example for Kubernetes</strong> * </p> - * * {@code server.xml } * * <pre> @@ -82,7 +80,6 @@ import org.apache.juli.logging.LogFactory; * </pre> * * Environment variable configuration<br> - * * {@code DNS_MEMBERSHIP_SERVICE_NAME=my-tomcat-app-membership } */ @@ -138,7 +135,8 @@ public class DNSMembershipProvider extends CloudMembershipProvider { if (ip.equals(localIp)) { // Update the UID on initial lookup Member localMember = service.getLocalMember(false); - if (localMember.getUniqueId() == CloudMembershipService.INITIAL_ID && localMember instanceof MemberImpl) { + if (localMember.getUniqueId() == CloudMembershipService.INITIAL_ID && + localMember instanceof MemberImpl) { ((MemberImpl) localMember).setUniqueId(id); } continue; @@ -166,8 +164,7 @@ public class DNSMembershipProvider extends CloudMembershipProvider { Member[] members = membership.getMembers(); if (members != null) { for (Member member : members) { - if (Arrays.equals(sender.getHost(), member.getHost()) - && sender.getPort() == member.getPort()) { + if (Arrays.equals(sender.getHost(), member.getHost()) && sender.getPort() == member.getPort()) { found = true; break; } diff --git a/java/org/apache/catalina/tribes/membership/cloud/KubernetesMembershipProvider.java b/java/org/apache/catalina/tribes/membership/cloud/KubernetesMembershipProvider.java index 6f0f4f244e..4d4d35e5bd 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/KubernetesMembershipProvider.java +++ b/java/org/apache/catalina/tribes/membership/cloud/KubernetesMembershipProvider.java @@ -38,8 +38,8 @@ import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.json.JSONParser; /** - * A {@link org.apache.catalina.tribes.MembershipProvider} that uses Kubernetes API to retrieve the members of a cluster.<br> - * + * A {@link org.apache.catalina.tribes.MembershipProvider} that uses Kubernetes API to retrieve the members of a + * cluster.<br> */ public class KubernetesMembershipProvider extends CloudMembershipProvider { @@ -64,7 +64,8 @@ public class KubernetesMembershipProvider extends CloudMembershipProvider { String masterHost = getEnv(CUSTOM_ENV_PREFIX + "MASTER_HOST", "KUBERNETES_SERVICE_HOST"); String masterPort = getEnv(CUSTOM_ENV_PREFIX + "MASTER_PORT", "KUBERNETES_SERVICE_PORT"); - String clientCertificateFile = getEnv(CUSTOM_ENV_PREFIX + "CLIENT_CERT_FILE", "KUBERNETES_CLIENT_CERTIFICATE_FILE"); + String clientCertificateFile = + getEnv(CUSTOM_ENV_PREFIX + "CLIENT_CERT_FILE", "KUBERNETES_CLIENT_CERTIFICATE_FILE"); String caCertFile = getEnv(CUSTOM_ENV_PREFIX + "CA_CERT_FILE", "KUBERNETES_CA_CERTIFICATE_FILE"); if (caCertFile == null) { caCertFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"; @@ -98,7 +99,8 @@ public class KubernetesMembershipProvider extends CloudMembershipProvider { if (clientKeyAlgo == null) { clientKeyAlgo = "RSA"; } - streamProvider = new CertificateStreamProvider(clientCertificateFile, clientKeyFile, clientKeyPassword, clientKeyAlgo, caCertFile); + streamProvider = new CertificateStreamProvider(clientCertificateFile, clientKeyFile, clientKeyPassword, + clientKeyAlgo, caCertFile); } String ver = getEnv(CUSTOM_ENV_PREFIX + "API_VERSION", "KUBERNETES_API_VERSION"); @@ -151,7 +153,7 @@ public class KubernetesMembershipProvider extends CloudMembershipProvider { protected void parsePods(Reader reader, List<MemberImpl> members) { JSONParser parser = new JSONParser(reader); try { - LinkedHashMap<String, Object> json = parser.object(); + LinkedHashMap<String,Object> json = parser.object(); Object itemsObject = json.get("items"); if (!(itemsObject instanceof List<?>)) { log.error(sm.getString("kubernetesMembershipProvider.invalidPodsList", "no items")); @@ -159,11 +161,11 @@ public class KubernetesMembershipProvider extends CloudMembershipProvider { } List<Object> items = (List<Object>) itemsObject; for (Object podObject : items) { - if (!(podObject instanceof LinkedHashMap<?, ?>)) { + if (!(podObject instanceof LinkedHashMap<?,?>)) { log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "item")); continue; } - LinkedHashMap<String, Object> pod = (LinkedHashMap<String, Object>) podObject; + LinkedHashMap<String,Object> pod = (LinkedHashMap<String,Object>) podObject; // If there is a "kind", check it is "Pod" Object podKindObject = pod.get("kind"); if (podKindObject != null && !"Pod".equals(podKindObject)) { @@ -171,11 +173,11 @@ public class KubernetesMembershipProvider extends CloudMembershipProvider { } // "metadata" contains "name", "uid" and "creationTimestamp" Object metadataObject = pod.get("metadata"); - if (!(metadataObject instanceof LinkedHashMap<?, ?>)) { + if (!(metadataObject instanceof LinkedHashMap<?,?>)) { log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "metadata")); continue; } - LinkedHashMap<String, Object> metadata = (LinkedHashMap<String, Object>) metadataObject; + LinkedHashMap<String,Object> metadata = (LinkedHashMap<String,Object>) metadataObject; Object nameObject = metadata.get("name"); if (nameObject == null) { log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "name")); @@ -189,11 +191,11 @@ public class KubernetesMembershipProvider extends CloudMembershipProvider { } // "status" contains "phase" (which must be "Running") and "podIP" Object statusObject = pod.get("status"); - if (!(statusObject instanceof LinkedHashMap<?, ?>)) { + if (!(statusObject instanceof LinkedHashMap<?,?>)) { log.warn(sm.getString("kubernetesMembershipProvider.invalidPod", "status")); continue; } - LinkedHashMap<String, Object> status = (LinkedHashMap<String, Object>) statusObject; + LinkedHashMap<String,Object> status = (LinkedHashMap<String,Object>) statusObject; if (!"Running".equals(status.get("phase"))) { continue; } @@ -209,14 +211,16 @@ public class KubernetesMembershipProvider extends CloudMembershipProvider { if (podIP.equals(localIp)) { // Update the UID on initial lookup Member localMember = service.getLocalMember(false); - if (localMember.getUniqueId() == CloudMembershipService.INITIAL_ID && localMember instanceof MemberImpl) { + if (localMember.getUniqueId() == CloudMembershipService.INITIAL_ID && + localMember instanceof MemberImpl) { byte[] id = md5.digest(uid.getBytes(StandardCharsets.US_ASCII)); ((MemberImpl) localMember).setUniqueId(id); } continue; } - long aliveTime = Duration.between(Instant.parse(creationTimestampObject.toString()), startTime).toMillis(); + long aliveTime = + Duration.between(Instant.parse(creationTimestampObject.toString()), startTime).toMillis(); MemberImpl member = null; try { diff --git a/java/org/apache/catalina/tribes/membership/cloud/StreamProvider.java b/java/org/apache/catalina/tribes/membership/cloud/StreamProvider.java index 7488864467..9ef3637f2b 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/StreamProvider.java +++ b/java/org/apache/catalina/tribes/membership/cloud/StreamProvider.java @@ -23,12 +23,16 @@ import java.util.Map; public interface StreamProvider { /** * Open stream to the specified URL. - * @param url the url - * @param headers the headers map + * + * @param url the url + * @param headers the headers map * @param connectTimeout connection timeout in ms - * @param readTimeout read timeout in ms + * @param readTimeout read timeout in ms + * * @return the stream + * * @throws IOException when an error occurs */ - InputStream openStream(String url, Map<String, String> headers, int connectTimeout, int readTimeout) throws IOException; + InputStream openStream(String url, Map<String,String> headers, int connectTimeout, int readTimeout) + throws IOException; } diff --git a/java/org/apache/catalina/tribes/membership/cloud/TokenStreamProvider.java b/java/org/apache/catalina/tribes/membership/cloud/TokenStreamProvider.java index b44ba1eeca..0c66e5e4d9 100644 --- a/java/org/apache/catalina/tribes/membership/cloud/TokenStreamProvider.java +++ b/java/org/apache/catalina/tribes/membership/cloud/TokenStreamProvider.java @@ -43,7 +43,7 @@ public class TokenStreamProvider extends AbstractStreamProvider { } @Override - public InputStream openStream(String url, Map<String, String> headers, int connectTimeout, int readTimeout) + public InputStream openStream(String url, Map<String,String> headers, int connectTimeout, int readTimeout) throws IOException { // Set token header if (token != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org