Author: kfujino
Date: Thu Jul 26 09:27:35 2018
New Revision: 1836707
URL: http://svn.apache.org/viewvc?rev=1836707&view=rev
Log:
Add New Static Membership Service implementations.
- initial implementaion that remain a lot of TODOs.
Added:
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
(with props)
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
(with props)
Added:
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java?rev=1836707&view=auto
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
(added)
+++
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
Thu Jul 26 09:27:35 2018
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelException.FaultyMember;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.util.StringManager;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+public class StaticMembershipProvider extends MembershipProviderBase
implements RpcCallback, ChannelListener, Heartbeat {
+
+ protected static final StringManager sm =
StringManager.getManager(StaticMembershipProvider.class);
+ private static final Log log =
LogFactory.getLog(StaticMembershipProvider.class);
+
+ protected Channel channel;
+ protected RpcChannel rpcChannel;
+ protected MembershipService service;
+ private String membershipName = null;
+ private byte[] membershipId = null;
+ protected ArrayList<StaticMember> staticMembers;
+ protected int sendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS;
+ protected long expirationTime = 5000;
+ protected int connectTimeout = 500;
+ protected long rpcTimeout = 3000;
+ protected int startLevel = 0;
+
+ @Override
+ public void init(Properties properties) throws Exception {
+ String expirationTimeStr = properties.getProperty("expirationTime");
+ this.expirationTime = Long.parseLong(expirationTimeStr);
+ String connectTimeoutStr = properties.getProperty("connectTimeout");
+ this.connectTimeout = Integer.parseInt(connectTimeoutStr);
+ String rpcTimeouStr = properties.getProperty("rpcTimeout");
+ this.rpcTimeout = Long.parseLong(rpcTimeouStr);
+ this.membershipName = properties.getProperty("membershipName");;
+ this.membershipId =
membershipName.getBytes(StandardCharsets.ISO_8859_1);
+ membership = new Membership(service.getLocalMember(true));
+ this.rpcChannel = new RpcChannel(this.membershipId, channel, this);
+ this.channel.addChannelListener(this);
+ }
+
+ @Override
+ public void start(int level) throws Exception {
+ if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) {
+ //no-op
+ }
+ if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) {
+ //no-op
+ }
+ startLevel = (startLevel | level);
+ if (startLevel == (Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ)) {
+ startMembership(getAliveMembers(staticMembers.toArray(new
Member[0])));
+ }
+ }
+
+ @Override
+ public boolean stop(int level) throws Exception {
+ if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) {
+ // no-op
+ }
+ if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) {
+ // no-op
+ }
+ startLevel = (startLevel & (~level));
+ if ( startLevel == 0 ) {
+ if (this.rpcChannel != null) {
+ this.rpcChannel.breakdown();
+ }
+ if (this.channel != null) {
+ try {
+ stopMembership(this.getMembers());
+ } catch ( Exception ignore){}
+ this.channel.removeChannelListener(this);
+ this.channel = null;
+ }
+ this.rpcChannel = null;
+ this.membership.reset();
+ }
+ return (startLevel == 0);
+ }
+
+ protected void startMembership(Member[] members) throws ChannelException {
+ if (members.length == 0) return;
+ MemberMessage msg = new MemberMessage(membershipId,
MemberMessage.MSG_START, service.getLocalMember(true));
+ Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY,
sendOptions, rpcTimeout);
+ if (resp.length > 0) {
+ for (int i = 0; i < resp.length; i++) {
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
+ }
+ } else {
+ log.warn("no response");
+ }
+ }
+
+ protected Member setupMember(Member mbr) {
+ // no-op
+ return mbr;
+ }
+
+ protected void memberAdded(Member member) {
+ Member mbr = setupMember(member);
+ if(membership.memberAlive(mbr)) {
+ // TODO invoke thread
+ membershipListener.memberAdded(mbr);
+ }
+ }
+
+ protected void memberDisappeared(Member member) {
+ membership.removeMember(member);
+ // TODO invoke thread
+ membershipListener.memberDisappeared(member);
+ }
+
+ protected void memberAlive(Member member) {
+ if (!membership.contains(member)) memberAdded(member);
+ membership.memberAlive(member);
+ }
+
+ protected void stopMembership(Member[] members) {
+ if (members.length == 0 ) return;
+ MemberMessage msg = new MemberMessage(membershipId,
MemberMessage.MSG_STOP, service.getLocalMember(true));
+ try {
+ channel.send(members, msg, sendOptions);
+ } catch (ChannelException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void messageReceived(Serializable msg, Member sender) {
+ MemberMessage memMsg = (MemberMessage) msg;
+ Member member = memMsg.getMember();
+ if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+ memberAdded(member);
+ } else if (memMsg.getMsgtype() == MemberMessage.MSG_STOP) {
+ memberDisappeared(member);
+ } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+ memberAlive(member);
+ }
+ }
+
+ @Override
+ public boolean accept(Serializable msg, Member sender) {
+ boolean result = false;
+ if (msg instanceof MemberMessage) {
+ result = Arrays.equals(this.membershipId, ((MemberMessage)
msg).getMembershipId());
+ }
+ return result;
+ }
+
+ @Override
+ public Serializable replyRequest(Serializable msg, final Member sender) {
+ if (!(msg instanceof MemberMessage)) return null;
+ MemberMessage memMsg = (MemberMessage) msg;
+ if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+ messageReceived(memMsg, sender);
+ memMsg.setMember(service.getLocalMember(true));
+ return memMsg;
+ } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+ messageReceived(memMsg, sender);
+ memMsg.setMember(service.getLocalMember(true));
+ return memMsg;
+ } else {
+ // other messages are ignored.
+ if (log.isInfoEnabled())
+ log.info("never happen");
+ return null;
+ }
+ }
+
+ @Override
+ public void leftOver(Serializable msg, Member sender) {
+ if (!(msg instanceof MemberMessage)) return;
+ MemberMessage memMsg = (MemberMessage) msg;
+ if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+ //TODO
+ } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+ //TODO
+ } else {
+ // other messages are ignored.
+ if (log.isInfoEnabled())
+ log.info("never happen");
+ }
+ }
+
+ @Override
+ public void heartbeat() {
+ try {
+ ping();
+ } catch (ChannelException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ protected void ping() throws ChannelException {
+ // send ping
+ Member[] members = getAliveMembers(staticMembers.toArray(new
Member[0]));
+ if (members.length == 0) return;
+ try {
+ MemberMessage msg = new MemberMessage(membershipId,
MemberMessage.MSG_PING, service.getLocalMember(true));
+ Response[] resp = rpcChannel.send(members, msg,
RpcChannel.ALL_REPLY, sendOptions, rpcTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
+ }
+ } catch (ChannelException ce) {
+ // Handle known failed members
+ FaultyMember[] faultyMembers = ce.getFaultyMembers();
+ for (FaultyMember faultyMember : faultyMembers) {
+ memberDisappeared(faultyMember.getMember());
+ }
+ throw ce;
+ }
+ // expire
+ checkExpired();
+ }
+
+ protected void checkExpired() {
+ Member[] expired = membership.expire(expirationTime);
+ for (Member member : expired) {
+ membershipListener.memberDisappeared(member);
+ }
+ }
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+
+ public void setMembershipService(MembershipService service) {
+ this.service = service;
+ }
+
+ public void setStaticMembers(ArrayList<StaticMember> staticMembers) {
+ this.staticMembers = staticMembers;
+ }
+
+ private Member[] getAliveMembers(Member[] members) {
+ List<Member> aliveMembers = new ArrayList<>();
+ for (Member member : members) {
+ try (Socket socket = new Socket()) {
+ InetAddress ia = InetAddress.getByAddress(member.getHost());
+ InetSocketAddress addr = new InetSocketAddress(ia,
member.getPort());
+ socket.connect(addr, connectTimeout);
+ aliveMembers.add(member);
+ } catch (Exception x) {//no-op
+ }
+ }
+ return aliveMembers.toArray(new Member[0]);
+ }
+
+ //
------------------------------------------------------------------------------
+ // member message to send to and from other memberships
+ //
------------------------------------------------------------------------------
+ public static class MemberMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public static final int MSG_START = 1;
+ public static final int MSG_STOP = 2;
+ public static final int MSG_PING = 3;
+ private final int msgtype;
+ private final byte[] membershipId;
+ private Member member;
+
+ public MemberMessage(byte[] membershipId, int msgtype, Member member) {
+ this.membershipId = membershipId;
+ this.msgtype = msgtype;
+ this.member = member;
+ }
+
+ public int getMsgtype() {
+ return msgtype;
+ }
+
+ public byte[] getMembershipId() {
+ return membershipId;
+ }
+
+ public Member getMember() {
+ return member;
+ }
+
+ public void setMember(Member local) {
+ this.member = local;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+ }
+
+}
\ No newline at end of file
Propchange:
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java?rev=1836707&view=auto
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
(added)
+++
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
Thu Jul 26 09:27:35 2018
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipProvider;
+import org.apache.catalina.tribes.util.StringManager;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+public class StaticMembershipService extends MembershipServiceBase {
+ private static final Log log =
LogFactory.getLog(StaticMembershipService.class);
+ protected static final StringManager sm =
StringManager.getManager(Constants.Package);
+
+ protected final ArrayList<StaticMember> staticMembers = new ArrayList<>();
+ private StaticMember localMember;
+ private StaticMembershipProvider provider;
+
+ public StaticMembershipService() {
+ //default values
+ setDefaults(this.properties);
+ }
+
+ @Override
+ public void start(int level) throws Exception {
+ if (provider != null) {
+ provider.start(level);
+ return;
+ }
+ localMember.setServiceStartTime(System.currentTimeMillis());
+ localMember.setMemberAliveTime(100);
+ // build membership provider
+ if (provider == null) {
+ provider = buildMembershipProvider();
+ }
+ provider.start(level);
+ // TODO JMX register
+ }
+
+ protected StaticMembershipProvider buildMembershipProvider() throws
Exception {
+ StaticMembershipProvider provider = new StaticMembershipProvider();
+ provider.setChannel(channel);
+ provider.setMembershipListener(this);
+ provider.setMembershipService(this);
+ provider.setStaticMembers(staticMembers);
+ properties.setProperty("membershipName", getMembershipName());
+ provider.init(properties);
+ return provider;
+ }
+
+ @Override
+ public void stop(int level) {
+ try {
+ if (provider != null && provider.stop(level)) {
+ // TODO JMX unregister
+ provider = null;
+ channel = null;;
+ }
+ } catch (Exception e) {
+ //TODO
+ log.error("stop failed", e);
+ }
+ }
+
+ @Override
+ public Member getLocalMember(boolean incAliveTime) {
+ if ( incAliveTime && localMember != null) {
+
localMember.setMemberAliveTime(System.currentTimeMillis()-localMember.getServiceStartTime());
+ }
+ return localMember;
+ }
+
+ @Override
+ public void setLocalMemberProperties(String listenHost, int listenPort,
+ int securePort, int udpPort) {
+ try {
+ localMember.setHostname(listenHost);
+ localMember.setPort(listenPort);
+ localMember.setSecurePort(securePort);
+ localMember.setUdpPort(udpPort);
+ localMember.getData(true, true);
+ } catch (IOException x) {
+ throw new IllegalArgumentException(x);
+ }
+ }
+
+ @Override
+ public void setPayload(byte[] payload) {
+ // no-op
+ }
+
+ @Override
+ public void setDomain(byte[] domain) {
+ // no-op
+ }
+
+ @Override
+ public MembershipProvider getMembershipProvider() {
+ return provider;
+ }
+
+ public ArrayList<StaticMember> getStaticMembers() {
+ return staticMembers;
+ }
+
+ public void addStaticMember(StaticMember member) {
+ staticMembers.add(member);
+ }
+
+ public void removeStaticMember(StaticMember member) {
+ staticMembers.remove(member);
+ }
+
+ public void setLocalMember(StaticMember member) {
+ this.localMember = member;
+ localMember.setLocal(true);
+ }
+
+ public long getExpirationTime() {
+ String expirationTime = properties.getProperty("expirationTime");
+ return Long.parseLong(expirationTime);
+ }
+
+ public void setExpirationTime(long expirationTime) {
+ properties.setProperty("expirationTime",
String.valueOf(expirationTime));
+ }
+
+ public int getConnectTimeout() {
+ String connectTimeout = properties.getProperty("connectTimeout");
+ return Integer.parseInt(connectTimeout);
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ properties.setProperty("connectTimeout",
String.valueOf(connectTimeout));
+ }
+
+ public long getRpcTimeout() {
+ String rpcTimeout = properties.getProperty("rpcTimeout");
+ return Long.parseLong(rpcTimeout);
+ }
+
+ public void setRpcTimeout(long rpcTimeout) {
+ properties.setProperty("rpcTimeout", String.valueOf(rpcTimeout));
+ }
+
+ @Override
+ public void setProperties(Properties properties) {
+ setDefaults(properties);
+ this.properties = properties;
+ }
+
+ protected void setDefaults(Properties properties) {
+ // default values
+ if (properties.getProperty("expirationTime") == null)
+ properties.setProperty("expirationTime","5000");
+ if (properties.getProperty("connectTimeout") == null)
+ properties.setProperty("connectTimeout","500");
+ if (properties.getProperty("rpcTimeout") == null)
+ properties.setProperty("rpcTimeout","3000");
+ }
+
+ private String getMembershipName() {
+ return channel.getName()+"-"+"StaticMembership";
+ }
+}
\ No newline at end of file
Propchange:
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]