Author: fhanik Date: Thu Jun 29 03:08:57 2006 New Revision: 417983 URL: http://svn.apache.org/viewvc?rev=417983&view=rev Log: Peter was right all along, domain name filtering becomes really useful, since trying to guess an available available multicast address is not that safe
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestDomainFilter.java Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/VERSION?rev=417983&r1=417982&r2=417983&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/VERSION (original) +++ tomcat/container/tc5.5.x/modules/groupcom/VERSION Thu Jun 29 03:08:57 2006 @@ -1,6 +1,7 @@ 0.9.3.3 - Added Member.getCommand, to separate out internal tribes logic from application payload - when setting payload, the update should get sent immediately + - domain name filter, using an interceptor if we wish 0.9.3.2 - MemberImpl.toString has a limit on the size it prints out 0.9.3.1 Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=417983&r1=417982&r2=417983&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java Thu Jun 29 03:08:57 2006 @@ -100,4 +100,10 @@ * @return byte[] */ public byte[] getCommand(); + + /** + * Domain for this cluster + * @return byte[] + */ + public byte[] getDomain(); } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java?rev=417983&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java Thu Jun 29 03:08:57 2006 @@ -0,0 +1,101 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ +package org.apache.catalina.tribes.group.interceptors; + +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.membership.Membership; +import java.util.Arrays; + +/** + * <p>Title: Member domain filter interceptor </p> + * + * <p>Description: Filters membership based on domain. + * </p> + * + * @author Filip Hanik + * @version 1.0 + */ +public class DomainFilterInterceptor extends ChannelInterceptorBase { + + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( DomainFilterInterceptor.class ); + + protected Membership membership = null; + + protected byte[] domain = new byte[0]; + + public void messageReceived(ChannelMessage msg) { + //should we filter incoming based on domain? + super.messageReceived(msg); + }//messageReceived + + + public void memberAdded(Member member) { + if ( membership == null ) setupMembership(); + boolean notify = false; + synchronized (membership) { + notify = Arrays.equals(domain,member.getDomain()); + if ( notify ) notify = membership.memberAlive((MemberImpl)member); + } + if ( notify ) super.memberAdded(member); + } + + public void memberDisappeared(Member member) { + if ( membership == null ) setupMembership(); + boolean notify = false; + synchronized (membership) { + notify = Arrays.equals(domain,member.getDomain()); + membership.removeMember((MemberImpl)member); + } + if ( notify ) super.memberDisappeared(member); + } + + public boolean hasMembers() { + if ( membership == null ) setupMembership(); + return membership.hasMembers(); + } + + public Member[] getMembers() { + if ( membership == null ) setupMembership(); + return membership.getMembers(); + } + + public Member getMember(Member mbr) { + if ( membership == null ) setupMembership(); + return membership.getMember(mbr); + } + + public Member getLocalMember(boolean incAlive) { + return super.getLocalMember(incAlive); + } + + + protected synchronized void setupMembership() { + if ( membership == null ) { + membership = new Membership((MemberImpl)super.getLocalMember(true)); + } + + } + + public byte[] getDomain() { + return domain; + } + + public void setDomain(byte[] domain) { + this.domain = domain; + } +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=417983&r1=417982&r2=417983&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Thu Jun 29 03:08:57 2006 @@ -91,6 +91,11 @@ protected byte[] command = new byte[0]; /** + * Domain if we want to filter based on domain. + */ + protected byte[] domain = new byte[0]; + + /** * Empty constructor for serialization */ public MemberImpl() { @@ -158,7 +163,7 @@ public int getDataLength() { - return 8+4+1+host.length+4+command.length+16+4+payload.length; + return 8+4+1+host.length+4+command.length+4+domain.length+16+4+payload.length; } /** @@ -187,6 +192,8 @@ //host - hl bytes //clen - 4 bytes //command - clen bytes + //dlen - 4 bytes + //domain - dlen bytes //uniqueId - 16 bytes //payload length - 4 bytes //payload plen bytes @@ -212,6 +219,12 @@ //command - clen bytes System.arraycopy(command,0,data,pos,command.length); pos+=command.length; + //dlen - 4 bytes + XByteBuffer.toBytes(domain.length,data,pos); + pos+=4; + //domain - dlen bytes + System.arraycopy(domain,0,data,pos,domain.length); + pos+=domain.length; //unique Id System.arraycopy(uniqueId,0,data,pos,uniqueId.length); pos+=uniqueId.length; @@ -238,6 +251,8 @@ //host - hl bytes //command length - 4 bytes //command clen bytes + //domain length - 4 bytes + //domain - dlen bytes //uniqueId - 16 bytes //payload length - 4bytes //payload - pl bytes @@ -262,6 +277,13 @@ System.arraycopy(data, pos, command, 0, command.length); pos+=command.length; + int dl = XByteBuffer.toInt(data,pos); + pos+=4; + + byte[] domain = new byte[cl]; + System.arraycopy(data, pos, domain, 0, domain.length); + pos+=domain.length; + byte[] uniqueId = new byte[16]; System.arraycopy(data, pos, uniqueId, 0, 16); pos+=16; @@ -278,6 +300,7 @@ member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); member.setUniqueId(uniqueId); member.payload = payload; + member.domain = domain; member.dataPkg = new byte[data.length]; System.arraycopy(data,0,member.dataPkg,0,data.length); @@ -351,6 +374,10 @@ return command; } + public byte[] getDomain() { + return domain; + } + public void setMemberAliveTime(long time) { memberAliveTime=time; } @@ -443,6 +470,10 @@ public void setCommand(byte[] command) { this.command = command!=null?command:new byte[0]; + } + + public void setDomain(byte[] domain) { + this.domain = domain!=null?domain:new byte[0]; } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestDomainFilter.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestDomainFilter.java?rev=417983&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestDomainFilter.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/membership/TestDomainFilter.java Thu Jun 29 03:08:57 2006 @@ -0,0 +1,104 @@ +package org.apache.catalina.tribes.test.membership; + +import java.util.ArrayList; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.GroupChannel; +import junit.framework.TestCase; +import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor; +import org.apache.catalina.tribes.util.UUIDGenerator; + +public class TestDomainFilter + extends TestCase { + private static int count = 10; + private ManagedChannel[] channels = new ManagedChannel[count]; + private TestMbrListener[] listeners = new TestMbrListener[count]; + + protected void setUp() throws Exception { + super.setUp(); + for (int i = 0; i < channels.length; i++) { + channels[i] = new GroupChannel(); + channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII")); + listeners[i] = new TestMbrListener( ("Listener-" + (i + 1))); + channels[i].addMembershipListener(listeners[i]); + DomainFilterInterceptor filter = new DomainFilterInterceptor(); + filter.setDomain(UUIDGenerator.randomUUID(false)); + channels[i].addInterceptor(filter); + } + } + + public void clear() { + for (int i = 0; i < channels.length; i++) { + listeners[i].members.clear(); + } + } + + public void testMemberArrival() throws Exception { + //purpose of this test is to make sure that we have received all the members + //that we can expect before the start method returns + Thread[] threads = new Thread[channels.length]; + for (int i=0; i<channels.length; i++ ) { + final Channel channel = channels[i]; + Thread t = new Thread() { + public void run() { + try { + channel.start(Channel.DEFAULT); + }catch ( Exception x ) { + throw new RuntimeException(x); + } + } + }; + threads[i] = t; + } + for (int i=0; i<threads.length; i++ ) threads[i].start(); + for (int i=0; i<threads.length; i++ ) threads[i].join(); + System.out.println("All channels started."); + for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member arrival length",0,listeners[i].members.size()); + } + + protected void tearDown() throws Exception { + + for (int i = 0; i < channels.length; i++) { + try { + channels[i].stop(Channel.DEFAULT); + } catch (Exception ignore) {} + } + super.tearDown(); + } + + public class TestMbrListener + implements MembershipListener { + public String name = null; + public TestMbrListener(String name) { + this.name = name; + } + + public ArrayList members = new ArrayList(); + public void memberAdded(Member member) { + if (!members.contains(member)) { + members.add(member); + try { + System.out.println(name + ":member added[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member added[unknown]"); + } + } + } + + public void memberDisappeared(Member member) { + if (members.contains(member)) { + members.remove(member); + try { + System.out.println(name + ":member disappeared[" + new String(member.getPayload(), "ASCII") + "; Thread:"+Thread.currentThread().getName()+"]"); + } catch (Exception x) { + System.out.println(name + ":member disappeared[unknown]"); + } + } + } + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]