Author: kfujino
Date: Thu Jul 26 09:27:35 2018
New Revision: 1836707

URL: http://svn.apache.org/viewvc?rev=1836707&view=rev
Log:
Add New Static Membership Service implementations.
- initial implementaion that remain a lot of TODOs. 

Added:
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
   (with props)
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
   (with props)

Added: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java?rev=1836707&view=auto
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
 (added)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
 Thu Jul 26 09:27:35 2018
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelException.FaultyMember;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.util.StringManager;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+public class StaticMembershipProvider extends MembershipProviderBase 
implements RpcCallback, ChannelListener, Heartbeat {
+
+    protected static final StringManager sm = 
StringManager.getManager(StaticMembershipProvider.class);
+    private static final Log log = 
LogFactory.getLog(StaticMembershipProvider.class);
+ 
+    protected Channel channel;
+    protected RpcChannel rpcChannel;
+    protected MembershipService service;
+    private String membershipName = null;
+    private byte[] membershipId = null;
+    protected ArrayList<StaticMember> staticMembers;
+    protected int sendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS;
+    protected long expirationTime = 5000;
+    protected int connectTimeout = 500;
+    protected long rpcTimeout = 3000;
+    protected int startLevel = 0;
+
+    @Override
+    public void init(Properties properties) throws Exception {
+        String expirationTimeStr = properties.getProperty("expirationTime");
+        this.expirationTime = Long.parseLong(expirationTimeStr);
+        String connectTimeoutStr = properties.getProperty("connectTimeout");
+        this.connectTimeout = Integer.parseInt(connectTimeoutStr);
+        String rpcTimeouStr = properties.getProperty("rpcTimeout");
+        this.rpcTimeout = Long.parseLong(rpcTimeouStr);
+        this.membershipName = properties.getProperty("membershipName");;
+        this.membershipId = 
membershipName.getBytes(StandardCharsets.ISO_8859_1);
+        membership = new Membership(service.getLocalMember(true));
+        this.rpcChannel = new RpcChannel(this.membershipId, channel, this);
+        this.channel.addChannelListener(this);
+    }
+
+    @Override
+    public void start(int level) throws Exception {
+        if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) {
+            //no-op
+        }
+        if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) {
+            //no-op
+        }
+        startLevel = (startLevel | level);
+        if (startLevel == (Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ)) {
+            startMembership(getAliveMembers(staticMembers.toArray(new 
Member[0])));
+        }
+    }
+
+    @Override
+    public boolean stop(int level) throws Exception {
+        if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) {
+            // no-op
+        }
+        if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) {
+            // no-op
+        }
+        startLevel = (startLevel & (~level));
+        if ( startLevel == 0 ) {
+            if (this.rpcChannel != null) {
+                this.rpcChannel.breakdown();
+            }
+            if (this.channel != null) {
+                try {
+                    stopMembership(this.getMembers());
+                } catch ( Exception ignore){}
+                this.channel.removeChannelListener(this);
+                this.channel = null;
+            }
+            this.rpcChannel = null;
+            this.membership.reset();
+        }
+        return (startLevel == 0);
+    }
+
+    protected void startMembership(Member[] members) throws ChannelException {
+        if (members.length == 0) return;
+        MemberMessage msg = new MemberMessage(membershipId, 
MemberMessage.MSG_START, service.getLocalMember(true));
+        Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, 
sendOptions, rpcTimeout);
+        if (resp.length > 0) {
+            for (int i = 0; i < resp.length; i++) {
+                messageReceived(resp[i].getMessage(), resp[i].getSource());
+            }
+        } else {
+            log.warn("no response");
+        }
+    }
+
+    protected Member setupMember(Member mbr) {
+        // no-op
+        return mbr;
+    }
+
+    protected void memberAdded(Member member) {
+        Member mbr = setupMember(member);
+        if(membership.memberAlive(mbr)) {
+            // TODO invoke thread
+            membershipListener.memberAdded(mbr);
+        }
+    }
+
+    protected void memberDisappeared(Member member) {
+        membership.removeMember(member);
+        // TODO invoke thread
+        membershipListener.memberDisappeared(member);
+    }
+
+    protected void memberAlive(Member member) {
+        if (!membership.contains(member)) memberAdded(member);
+        membership.memberAlive(member);
+    }
+
+    protected void stopMembership(Member[] members) {
+        if (members.length == 0 ) return;
+        MemberMessage msg = new MemberMessage(membershipId, 
MemberMessage.MSG_STOP, service.getLocalMember(true));
+        try {
+            channel.send(members, msg, sendOptions);
+        } catch (ChannelException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void messageReceived(Serializable msg, Member sender) {
+        MemberMessage memMsg = (MemberMessage) msg;
+        Member member = memMsg.getMember();
+        if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+            memberAdded(member);
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_STOP) {
+            memberDisappeared(member);
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+            memberAlive(member);
+        }
+    }
+
+    @Override
+    public boolean accept(Serializable msg, Member sender) {
+        boolean result = false;
+        if (msg instanceof MemberMessage) {
+            result = Arrays.equals(this.membershipId, ((MemberMessage) 
msg).getMembershipId());
+        }
+        return result;
+    }
+
+    @Override
+    public Serializable replyRequest(Serializable msg, final Member sender) {
+        if (!(msg instanceof MemberMessage)) return null;
+        MemberMessage memMsg = (MemberMessage) msg;
+        if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+            messageReceived(memMsg, sender);
+            memMsg.setMember(service.getLocalMember(true));
+            return memMsg;
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+            messageReceived(memMsg, sender);
+            memMsg.setMember(service.getLocalMember(true));
+            return memMsg;
+        } else {
+            // other messages are ignored.
+            if (log.isInfoEnabled())
+                log.info("never happen");
+            return null;
+        }
+    }
+
+    @Override
+    public void leftOver(Serializable msg, Member sender) {
+        if (!(msg instanceof MemberMessage)) return;
+        MemberMessage memMsg = (MemberMessage) msg;
+        if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+            //TODO
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+            //TODO
+        } else {
+            // other messages are ignored.
+            if (log.isInfoEnabled())
+                log.info("never happen");
+        }
+    }
+
+    @Override
+    public void heartbeat() {
+        try {
+            ping();
+        } catch (ChannelException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    protected void ping() throws ChannelException {
+        // send ping
+        Member[] members = getAliveMembers(staticMembers.toArray(new 
Member[0]));
+        if (members.length == 0) return;
+        try {
+            MemberMessage msg = new MemberMessage(membershipId, 
MemberMessage.MSG_PING, service.getLocalMember(true));
+            Response[] resp = rpcChannel.send(members, msg, 
RpcChannel.ALL_REPLY, sendOptions, rpcTimeout);
+            for (int i = 0; i < resp.length; i++) {
+                messageReceived(resp[i].getMessage(), resp[i].getSource());
+            }
+        } catch (ChannelException ce) {
+            // Handle known failed members
+            FaultyMember[] faultyMembers = ce.getFaultyMembers();
+            for (FaultyMember faultyMember : faultyMembers) {
+                memberDisappeared(faultyMember.getMember());
+            }
+            throw ce;
+        }
+        // expire
+        checkExpired();
+    }
+
+    protected void checkExpired() {
+        Member[] expired = membership.expire(expirationTime);
+        for (Member member : expired) {
+            membershipListener.memberDisappeared(member);
+        }
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
+    public void setMembershipService(MembershipService service) {
+        this.service = service;
+    }
+
+    public void setStaticMembers(ArrayList<StaticMember> staticMembers) {
+        this.staticMembers = staticMembers;
+    }
+
+    private Member[] getAliveMembers(Member[] members) {
+        List<Member> aliveMembers = new ArrayList<>();
+        for (Member member : members) {
+            try (Socket socket = new Socket()) {
+                InetAddress ia = InetAddress.getByAddress(member.getHost());
+                InetSocketAddress addr = new InetSocketAddress(ia, 
member.getPort());
+                socket.connect(addr, connectTimeout);
+                aliveMembers.add(member);
+            } catch (Exception x) {//no-op
+            }
+        }
+        return aliveMembers.toArray(new Member[0]);
+    }
+
+    // 
------------------------------------------------------------------------------
+    // member message to send to and from other memberships
+    // 
------------------------------------------------------------------------------
+    public static class MemberMessage implements Serializable {
+        private static final long serialVersionUID = 1L;
+        public static final int MSG_START = 1;
+        public static final int MSG_STOP = 2;
+        public static final int MSG_PING = 3;
+        private final int msgtype;
+        private final byte[] membershipId;
+        private Member member;
+
+        public MemberMessage(byte[] membershipId, int msgtype, Member member) {
+            this.membershipId = membershipId;
+            this.msgtype = msgtype;
+            this.member = member;
+        }
+
+        public int getMsgtype() {
+            return msgtype;
+        }
+
+        public byte[] getMembershipId() {
+            return membershipId;
+        }
+
+        public Member getMember() {
+            return member;
+        }
+
+        public void setMember(Member local) {
+            this.member = local;
+        }
+
+        @Override
+        public String toString() {
+            return super.toString();
+        }
+    }
+
+}
\ No newline at end of file

Propchange: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java?rev=1836707&view=auto
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
 (added)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
 Thu Jul 26 09:27:35 2018
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipProvider;
+import org.apache.catalina.tribes.util.StringManager;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+public class StaticMembershipService extends MembershipServiceBase {
+    private static final Log log = 
LogFactory.getLog(StaticMembershipService.class);
+    protected static final StringManager sm = 
StringManager.getManager(Constants.Package);
+
+    protected final ArrayList<StaticMember> staticMembers = new ArrayList<>();
+    private StaticMember localMember;
+    private StaticMembershipProvider provider;
+
+    public StaticMembershipService() {
+        //default values
+        setDefaults(this.properties);
+    }
+
+    @Override
+    public void start(int level) throws Exception {
+        if (provider != null) {
+            provider.start(level);
+            return;
+        }
+        localMember.setServiceStartTime(System.currentTimeMillis());
+        localMember.setMemberAliveTime(100);
+        // build membership provider
+        if (provider == null) {
+            provider = buildMembershipProvider();
+        }
+        provider.start(level);
+        // TODO JMX register
+    }
+
+    protected StaticMembershipProvider buildMembershipProvider() throws 
Exception {
+        StaticMembershipProvider provider = new StaticMembershipProvider();
+        provider.setChannel(channel);
+        provider.setMembershipListener(this);
+        provider.setMembershipService(this);
+        provider.setStaticMembers(staticMembers);
+        properties.setProperty("membershipName", getMembershipName());
+        provider.init(properties);
+        return provider;
+    }
+
+    @Override
+    public void stop(int level) {
+        try {
+            if (provider != null && provider.stop(level)) {
+                // TODO JMX unregister
+                provider = null;
+                channel = null;;
+            }
+        } catch (Exception e) {
+            //TODO
+            log.error("stop failed", e);
+        }
+    }
+
+    @Override
+    public Member getLocalMember(boolean incAliveTime) {
+        if ( incAliveTime && localMember != null) {
+            
localMember.setMemberAliveTime(System.currentTimeMillis()-localMember.getServiceStartTime());
+        }
+        return localMember;
+    }
+
+    @Override
+    public void setLocalMemberProperties(String listenHost, int listenPort, 
+            int securePort, int udpPort) {
+        try {
+            localMember.setHostname(listenHost);
+            localMember.setPort(listenPort);
+            localMember.setSecurePort(securePort);
+            localMember.setUdpPort(udpPort);
+            localMember.getData(true, true);
+        } catch (IOException x) {
+            throw new IllegalArgumentException(x);
+        }
+    }
+
+    @Override
+    public void setPayload(byte[] payload) {
+        // no-op
+    }
+
+    @Override
+    public void setDomain(byte[] domain) {
+        // no-op
+    }
+
+    @Override
+    public MembershipProvider getMembershipProvider() {
+        return provider;
+    }
+
+    public ArrayList<StaticMember> getStaticMembers() {
+        return staticMembers;
+    }
+
+    public void addStaticMember(StaticMember member) {
+        staticMembers.add(member);
+    }
+
+    public void removeStaticMember(StaticMember member) {
+        staticMembers.remove(member);
+    }
+
+    public void setLocalMember(StaticMember member) {
+        this.localMember = member;
+        localMember.setLocal(true);
+    }
+
+     public long getExpirationTime() {
+         String expirationTime = properties.getProperty("expirationTime");
+         return Long.parseLong(expirationTime);
+     }
+
+    public void setExpirationTime(long expirationTime) {
+        properties.setProperty("expirationTime", 
String.valueOf(expirationTime));
+    }
+
+     public int getConnectTimeout() {
+         String connectTimeout = properties.getProperty("connectTimeout");
+         return Integer.parseInt(connectTimeout);
+     }
+
+    public void setConnectTimeout(int connectTimeout) {
+        properties.setProperty("connectTimeout", 
String.valueOf(connectTimeout));
+    }
+
+    public long getRpcTimeout() {
+        String rpcTimeout = properties.getProperty("rpcTimeout");
+        return Long.parseLong(rpcTimeout);
+    }
+
+    public void setRpcTimeout(long rpcTimeout) {
+        properties.setProperty("rpcTimeout", String.valueOf(rpcTimeout));
+    }
+
+    @Override
+    public void setProperties(Properties properties) {
+        setDefaults(properties);
+        this.properties = properties;
+    }
+
+    protected void setDefaults(Properties properties) {
+        // default values
+        if (properties.getProperty("expirationTime") == null)
+            properties.setProperty("expirationTime","5000");
+        if (properties.getProperty("connectTimeout") == null)
+            properties.setProperty("connectTimeout","500");
+        if (properties.getProperty("rpcTimeout") == null)
+            properties.setProperty("rpcTimeout","3000");
+    }
+
+    private String getMembershipName() {
+        return channel.getName()+"-"+"StaticMembership";
+    }
+}
\ No newline at end of file

Propchange: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to