Author: kfujino Date: Thu Jul 26 09:27:35 2018 New Revision: 1836707 URL: http://svn.apache.org/viewvc?rev=1836707&view=rev Log: Add New Static Membership Service implementations. - initial implementaion that remain a lot of TODOs.
Added: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java (with props) tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java (with props) Added: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java?rev=1836707&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java (added) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java Thu Jul 26 09:27:35 2018 @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.membership; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelException.FaultyMember; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Heartbeat; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipService; +import org.apache.catalina.tribes.group.Response; +import org.apache.catalina.tribes.group.RpcCallback; +import org.apache.catalina.tribes.group.RpcChannel; +import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.util.StringManager; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; + +public class StaticMembershipProvider extends MembershipProviderBase implements RpcCallback, ChannelListener, Heartbeat { + + protected static final StringManager sm = StringManager.getManager(StaticMembershipProvider.class); + private static final Log log = LogFactory.getLog(StaticMembershipProvider.class); + + protected Channel channel; + protected RpcChannel rpcChannel; + protected MembershipService service; + private String membershipName = null; + private byte[] membershipId = null; + protected ArrayList<StaticMember> staticMembers; + protected int sendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS; + protected long expirationTime = 5000; + protected int connectTimeout = 500; + protected long rpcTimeout = 3000; + protected int startLevel = 0; + + @Override + public void init(Properties properties) throws Exception { + String expirationTimeStr = properties.getProperty("expirationTime"); + this.expirationTime = Long.parseLong(expirationTimeStr); + String connectTimeoutStr = properties.getProperty("connectTimeout"); + this.connectTimeout = Integer.parseInt(connectTimeoutStr); + String rpcTimeouStr = properties.getProperty("rpcTimeout"); + this.rpcTimeout = Long.parseLong(rpcTimeouStr); + this.membershipName = properties.getProperty("membershipName");; + this.membershipId = membershipName.getBytes(StandardCharsets.ISO_8859_1); + membership = new Membership(service.getLocalMember(true)); + this.rpcChannel = new RpcChannel(this.membershipId, channel, this); + this.channel.addChannelListener(this); + } + + @Override + public void start(int level) throws Exception { + if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) { + //no-op + } + if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) { + //no-op + } + startLevel = (startLevel | level); + if (startLevel == (Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ)) { + startMembership(getAliveMembers(staticMembers.toArray(new Member[0]))); + } + } + + @Override + public boolean stop(int level) throws Exception { + if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) { + // no-op + } + if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) { + // no-op + } + startLevel = (startLevel & (~level)); + if ( startLevel == 0 ) { + if (this.rpcChannel != null) { + this.rpcChannel.breakdown(); + } + if (this.channel != null) { + try { + stopMembership(this.getMembers()); + } catch ( Exception ignore){} + this.channel.removeChannelListener(this); + this.channel = null; + } + this.rpcChannel = null; + this.membership.reset(); + } + return (startLevel == 0); + } + + protected void startMembership(Member[] members) throws ChannelException { + if (members.length == 0) return; + MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_START, service.getLocalMember(true)); + Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout); + if (resp.length > 0) { + for (int i = 0; i < resp.length; i++) { + messageReceived(resp[i].getMessage(), resp[i].getSource()); + } + } else { + log.warn("no response"); + } + } + + protected Member setupMember(Member mbr) { + // no-op + return mbr; + } + + protected void memberAdded(Member member) { + Member mbr = setupMember(member); + if(membership.memberAlive(mbr)) { + // TODO invoke thread + membershipListener.memberAdded(mbr); + } + } + + protected void memberDisappeared(Member member) { + membership.removeMember(member); + // TODO invoke thread + membershipListener.memberDisappeared(member); + } + + protected void memberAlive(Member member) { + if (!membership.contains(member)) memberAdded(member); + membership.memberAlive(member); + } + + protected void stopMembership(Member[] members) { + if (members.length == 0 ) return; + MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_STOP, service.getLocalMember(true)); + try { + channel.send(members, msg, sendOptions); + } catch (ChannelException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void messageReceived(Serializable msg, Member sender) { + MemberMessage memMsg = (MemberMessage) msg; + Member member = memMsg.getMember(); + if (memMsg.getMsgtype() == MemberMessage.MSG_START) { + memberAdded(member); + } else if (memMsg.getMsgtype() == MemberMessage.MSG_STOP) { + memberDisappeared(member); + } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { + memberAlive(member); + } + } + + @Override + public boolean accept(Serializable msg, Member sender) { + boolean result = false; + if (msg instanceof MemberMessage) { + result = Arrays.equals(this.membershipId, ((MemberMessage) msg).getMembershipId()); + } + return result; + } + + @Override + public Serializable replyRequest(Serializable msg, final Member sender) { + if (!(msg instanceof MemberMessage)) return null; + MemberMessage memMsg = (MemberMessage) msg; + if (memMsg.getMsgtype() == MemberMessage.MSG_START) { + messageReceived(memMsg, sender); + memMsg.setMember(service.getLocalMember(true)); + return memMsg; + } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { + messageReceived(memMsg, sender); + memMsg.setMember(service.getLocalMember(true)); + return memMsg; + } else { + // other messages are ignored. + if (log.isInfoEnabled()) + log.info("never happen"); + return null; + } + } + + @Override + public void leftOver(Serializable msg, Member sender) { + if (!(msg instanceof MemberMessage)) return; + MemberMessage memMsg = (MemberMessage) msg; + if (memMsg.getMsgtype() == MemberMessage.MSG_START) { + //TODO + } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { + //TODO + } else { + // other messages are ignored. + if (log.isInfoEnabled()) + log.info("never happen"); + } + } + + @Override + public void heartbeat() { + try { + ping(); + } catch (ChannelException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + protected void ping() throws ChannelException { + // send ping + Member[] members = getAliveMembers(staticMembers.toArray(new Member[0])); + if (members.length == 0) return; + try { + MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_PING, service.getLocalMember(true)); + Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout); + for (int i = 0; i < resp.length; i++) { + messageReceived(resp[i].getMessage(), resp[i].getSource()); + } + } catch (ChannelException ce) { + // Handle known failed members + FaultyMember[] faultyMembers = ce.getFaultyMembers(); + for (FaultyMember faultyMember : faultyMembers) { + memberDisappeared(faultyMember.getMember()); + } + throw ce; + } + // expire + checkExpired(); + } + + protected void checkExpired() { + Member[] expired = membership.expire(expirationTime); + for (Member member : expired) { + membershipListener.memberDisappeared(member); + } + } + + public void setChannel(Channel channel) { + this.channel = channel; + } + + public void setMembershipService(MembershipService service) { + this.service = service; + } + + public void setStaticMembers(ArrayList<StaticMember> staticMembers) { + this.staticMembers = staticMembers; + } + + private Member[] getAliveMembers(Member[] members) { + List<Member> aliveMembers = new ArrayList<>(); + for (Member member : members) { + try (Socket socket = new Socket()) { + InetAddress ia = InetAddress.getByAddress(member.getHost()); + InetSocketAddress addr = new InetSocketAddress(ia, member.getPort()); + socket.connect(addr, connectTimeout); + aliveMembers.add(member); + } catch (Exception x) {//no-op + } + } + return aliveMembers.toArray(new Member[0]); + } + + // ------------------------------------------------------------------------------ + // member message to send to and from other memberships + // ------------------------------------------------------------------------------ + public static class MemberMessage implements Serializable { + private static final long serialVersionUID = 1L; + public static final int MSG_START = 1; + public static final int MSG_STOP = 2; + public static final int MSG_PING = 3; + private final int msgtype; + private final byte[] membershipId; + private Member member; + + public MemberMessage(byte[] membershipId, int msgtype, Member member) { + this.membershipId = membershipId; + this.msgtype = msgtype; + this.member = member; + } + + public int getMsgtype() { + return msgtype; + } + + public byte[] getMembershipId() { + return membershipId; + } + + public Member getMember() { + return member; + } + + public void setMember(Member local) { + this.member = local; + } + + @Override + public String toString() { + return super.toString(); + } + } + +} \ No newline at end of file Propchange: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java?rev=1836707&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java (added) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java Thu Jul 26 09:27:35 2018 @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.membership; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Properties; + +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipProvider; +import org.apache.catalina.tribes.util.StringManager; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; + +public class StaticMembershipService extends MembershipServiceBase { + private static final Log log = LogFactory.getLog(StaticMembershipService.class); + protected static final StringManager sm = StringManager.getManager(Constants.Package); + + protected final ArrayList<StaticMember> staticMembers = new ArrayList<>(); + private StaticMember localMember; + private StaticMembershipProvider provider; + + public StaticMembershipService() { + //default values + setDefaults(this.properties); + } + + @Override + public void start(int level) throws Exception { + if (provider != null) { + provider.start(level); + return; + } + localMember.setServiceStartTime(System.currentTimeMillis()); + localMember.setMemberAliveTime(100); + // build membership provider + if (provider == null) { + provider = buildMembershipProvider(); + } + provider.start(level); + // TODO JMX register + } + + protected StaticMembershipProvider buildMembershipProvider() throws Exception { + StaticMembershipProvider provider = new StaticMembershipProvider(); + provider.setChannel(channel); + provider.setMembershipListener(this); + provider.setMembershipService(this); + provider.setStaticMembers(staticMembers); + properties.setProperty("membershipName", getMembershipName()); + provider.init(properties); + return provider; + } + + @Override + public void stop(int level) { + try { + if (provider != null && provider.stop(level)) { + // TODO JMX unregister + provider = null; + channel = null;; + } + } catch (Exception e) { + //TODO + log.error("stop failed", e); + } + } + + @Override + public Member getLocalMember(boolean incAliveTime) { + if ( incAliveTime && localMember != null) { + localMember.setMemberAliveTime(System.currentTimeMillis()-localMember.getServiceStartTime()); + } + return localMember; + } + + @Override + public void setLocalMemberProperties(String listenHost, int listenPort, + int securePort, int udpPort) { + try { + localMember.setHostname(listenHost); + localMember.setPort(listenPort); + localMember.setSecurePort(securePort); + localMember.setUdpPort(udpPort); + localMember.getData(true, true); + } catch (IOException x) { + throw new IllegalArgumentException(x); + } + } + + @Override + public void setPayload(byte[] payload) { + // no-op + } + + @Override + public void setDomain(byte[] domain) { + // no-op + } + + @Override + public MembershipProvider getMembershipProvider() { + return provider; + } + + public ArrayList<StaticMember> getStaticMembers() { + return staticMembers; + } + + public void addStaticMember(StaticMember member) { + staticMembers.add(member); + } + + public void removeStaticMember(StaticMember member) { + staticMembers.remove(member); + } + + public void setLocalMember(StaticMember member) { + this.localMember = member; + localMember.setLocal(true); + } + + public long getExpirationTime() { + String expirationTime = properties.getProperty("expirationTime"); + return Long.parseLong(expirationTime); + } + + public void setExpirationTime(long expirationTime) { + properties.setProperty("expirationTime", String.valueOf(expirationTime)); + } + + public int getConnectTimeout() { + String connectTimeout = properties.getProperty("connectTimeout"); + return Integer.parseInt(connectTimeout); + } + + public void setConnectTimeout(int connectTimeout) { + properties.setProperty("connectTimeout", String.valueOf(connectTimeout)); + } + + public long getRpcTimeout() { + String rpcTimeout = properties.getProperty("rpcTimeout"); + return Long.parseLong(rpcTimeout); + } + + public void setRpcTimeout(long rpcTimeout) { + properties.setProperty("rpcTimeout", String.valueOf(rpcTimeout)); + } + + @Override + public void setProperties(Properties properties) { + setDefaults(properties); + this.properties = properties; + } + + protected void setDefaults(Properties properties) { + // default values + if (properties.getProperty("expirationTime") == null) + properties.setProperty("expirationTime","5000"); + if (properties.getProperty("connectTimeout") == null) + properties.setProperty("connectTimeout","500"); + if (properties.getProperty("rpcTimeout") == null) + properties.setProperty("rpcTimeout","3000"); + } + + private String getMembershipName() { + return channel.getName()+"-"+"StaticMembership"; + } +} \ No newline at end of file Propchange: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org