http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1e5fc5b4/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalClientMembership.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalClientMembership.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalClientMembership.java new file mode 100755 index 0000000..1086620 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalClientMembership.java @@ -0,0 +1,617 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.tier; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.SystemFailure; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientHealthMonitor; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.management.membership.ClientMembershipEvent; +import com.gemstone.gemfire.management.membership.ClientMembershipListener; + +/** + * Handles registration and event notification duties for + * <code>ClientMembershipListener</code>s. The public counterpart for this + * class is {@link com.gemstone.gemfire.management.membership.ClientMembership}. + * + * @author Kirk Lund + * @since 4.2.1 + */ +public final class InternalClientMembership { + + private static final Logger logger = LogService.getLogger(); + + /** + * The membership listeners registered on this InternalBridgeMembership + * + * This list is never modified in place, and a new list is installed + * only under the control of (@link #membershipLock}. + */ + private static volatile List<ClientMembershipListener> clientMembershipListeners = Collections.emptyList(); + + /** + * Must be locked whenever references to the volatile field + * {@link #membershipListeners} is changed. + */ + private static final Object membershipLock = new Object(); + + /** + * QueuedExecutor for firing BridgeMembershipEvents + * + * Access synchronized via {@link #systems} + */ + private static ThreadPoolExecutor executor; + + private static final ThreadGroup threadGroup = + LoggingThreadGroup.createThreadGroup( + "BridgeMembership Event Invoker Group", logger); + + /** List of connected <code>DistributedSystem</code>s */ + private static final List systems = new ArrayList(1); + + /** + * True if class is monitoring systems + * + * @guarded.By InternalBridgeMembership.class + */ + private static boolean isMonitoring = false; + + /** + * This work used to be in a class initializer. Unfortunately, this allowed + * the class to escape before it was fully initialized, so now we just + * make sure this work is done before any public static method on it + * is invoked. + */ + private static synchronized void startMonitoring() { + if (isMonitoring) { + return; + } + + synchronized(systems) { + // Initialize our own list of distributed systems via a connect listener + List existingSystems = InternalDistributedSystem.addConnectListener( + new InternalDistributedSystem.ConnectListener() { + public void onConnect(InternalDistributedSystem sys) { + addInternalDistributedSystem(sys); + } + }); + + isMonitoring = true; + + // While still holding the lock on systems, add all currently known + // systems to our own list + for (Iterator iter = existingSystems.iterator(); iter.hasNext();) { + InternalDistributedSystem sys = (InternalDistributedSystem) iter.next(); + try { + if (sys.isConnected()) { + addInternalDistributedSystem(sys); + } + } + catch (DistributedSystemDisconnectedException e) { + // it doesn't care (bug 37379) + } + } + + } // synchronized + } + + private InternalClientMembership() {} + + /** + * Registers a {@link ClientMembershipListener} for notification of connection + * changes for CacheServer and clients. + * + * @param listener + * a ClientMembershipListener to be registered + */ + public static void registerClientMembershipListener(ClientMembershipListener listener) { + startMonitoring(); + synchronized (membershipLock) { + List<ClientMembershipListener> oldListeners = clientMembershipListeners; + if (!oldListeners.contains(listener)) { + List<ClientMembershipListener> newListeners = new ArrayList<ClientMembershipListener>(oldListeners); + newListeners.add(listener); + clientMembershipListeners = newListeners; + } + } + } + + /** + * Removes registration of a previously registered + * {@link ClientMembershipListener}. + * + * @param listener + * a ClientMembershipListener to be unregistered + */ + public static void unregisterClientMembershipListener(ClientMembershipListener listener) { + startMonitoring(); + synchronized (membershipLock) { + List<ClientMembershipListener> oldListeners = clientMembershipListeners; + if (oldListeners.contains(listener)) { + List<ClientMembershipListener> newListeners = new ArrayList<ClientMembershipListener>(oldListeners); + if (newListeners.remove(listener)) { + clientMembershipListeners = newListeners; + } + } + } + } + + /** + * Returns an array of all the currently registered + * <code>ClientMembershipListener</code>s. Modifications to the returned array + * will not effect the registration of these listeners. + * + * @return the registered <code>ClientMembershipListener</code>s; an empty + * array if no listeners + */ + public static ClientMembershipListener[] getClientMembershipListeners() { + startMonitoring(); + // Synchronization is not needed because we never modify this list + // in place. + + List<ClientMembershipListener> l = clientMembershipListeners; // volatile fetch + // convert to an array + ClientMembershipListener[] listeners = (ClientMembershipListener[]) l + .toArray(new ClientMembershipListener[l.size()]); + return listeners; + } + + /** + * Removes registration of all currently registered + * <code>BridgeMembershipListener<code>s. and <code>ClientMembershipListener<code>s. + */ + public static void unregisterAllListeners() { + startMonitoring(); + synchronized (membershipLock) { + clientMembershipListeners = new ArrayList<ClientMembershipListener>(); + } + } + + + + /** + * Returns a map of client memberIds to count of connections to that client. + * The map entry key is a String representation of the client memberId, and + * the map entry value is an Integer count of connections to that client. + * Since a single client can have multiple ConnectionProxy objects, this + * map will contain all the Connection objects across the ConnectionProxies + * @param onlyClientsNotifiedByThisServer true will return only those clients + * that are actively being updated by this server + * @return map of client memberIds to count of connections to that client + * + * + */ + public static Map getConnectedClients(boolean onlyClientsNotifiedByThisServer) { + ClientHealthMonitor chMon = ClientHealthMonitor.getInstance(); + Set filterProxyIDs = null; + if(onlyClientsNotifiedByThisServer) { + // Note it is not necessary to synchronize on the list of bridge servers here, + // since this is only a status (snapshot) of the system. + for (Iterator bsii = CacheFactory.getAnyInstance().getCacheServers().iterator(); bsii.hasNext(); ) { + CacheServerImpl bsi = (CacheServerImpl) bsii.next(); + AcceptorImpl ai = bsi.getAcceptor(); + if (ai != null && ai.getCacheClientNotifier() != null) { + if (filterProxyIDs != null) { + // notifierClients is a copy set from CacheClientNotifier + filterProxyIDs.addAll(ai.getCacheClientNotifier().getActiveClients()); + } + else { + // notifierClients is a copy set from CacheClientNotifier + filterProxyIDs = ai.getCacheClientNotifier().getActiveClients(); + } + } + } + } + + Map map = chMon.getConnectedClients(filterProxyIDs); + /*if (onlyClientsNotifiedByThisServer) { + Map notifyMap = new HashMap(); + + for (Iterator iter = map.keySet().iterator(); iter.hasNext();) { + String memberId = (String) iter.next(); + if (notifierClients.contains(memberId)) { + // found memberId that is notified by this server + notifyMap.put(memberId, map.get(memberId)); + } + } + map = notifyMap; + }*/ + return map; + } + + /** + * This method returns the CacheClientStatus for all the clients that are + * connected to this server. This method returns all clients irrespective of + * whether subscription is enabled or not. + * + * @return Map of ClientProxyMembershipID against CacheClientStatus objects. + */ + public static Map getStatusForAllClientsIgnoreSubscriptionStatus() { + Map result = new HashMap(); + if (ClientHealthMonitor.getInstance() != null) + result = ClientHealthMonitor.getInstance().getStatusForAllClients(); + + return result; + } + + /** + * Caller must synchronize on cache.allBridgeServersLock + * @return all the clients + */ + public static Map getConnectedClients() { + + // Get all clients + Map allClients = new HashMap(); + for (Iterator bsii = CacheFactory.getAnyInstance().getCacheServers().iterator(); bsii.hasNext(); ) { + CacheServerImpl bsi = (CacheServerImpl) bsii.next(); + AcceptorImpl ai = bsi.getAcceptor(); + if (ai != null && ai.getCacheClientNotifier() != null) { + allClients.putAll(ai.getCacheClientNotifier().getAllClients()); + } + } + + // Fill in the missing info, if HealthMonitor started + if (ClientHealthMonitor.getInstance()!=null) + ClientHealthMonitor.getInstance().fillInClientInfo(allClients); + + return allClients; + } + + public static Map getClientQueueSizes() { + Map clientQueueSizes = new HashMap(); + GemFireCacheImpl c = (GemFireCacheImpl)CacheFactory.getAnyInstance(); + if (c==null) // Add a NULL Check + return clientQueueSizes; + + for (Iterator bsii = c.getCacheServers().iterator(); bsii.hasNext(); ) { + CacheServerImpl bsi = (CacheServerImpl) bsii.next(); + AcceptorImpl ai = bsi.getAcceptor(); + if (ai != null && ai.getCacheClientNotifier() != null) { + clientQueueSizes.putAll(ai.getCacheClientNotifier().getClientQueueSizes()); + } + } // for + return clientQueueSizes; + } + + /** + * Returns a map of servers to count of pools connected to that server. + * The map entry key is a String representation of the server, + * @return map of servers to count of pools using that server + */ + public static Map getConnectedServers() { + final Map map = new HashMap(); // KEY:server (String), VALUE:List of active endpoints + // returns an unmodifiable set + Map/*<String,Pool>*/ poolMap = PoolManager.getAll(); + Iterator pools = poolMap.values().iterator(); + while(pools.hasNext()) { + PoolImpl pi = (PoolImpl)pools.next(); + Map/*<ServerLocation,Endpoint>*/ eps = pi.getEndpointMap(); + Iterator it = eps.entrySet().iterator(); + while(it.hasNext()) { + Map.Entry entry = (Map.Entry)it.next(); + ServerLocation loc = (ServerLocation)entry.getKey(); + com.gemstone.gemfire.cache.client.internal.Endpoint ep = (com.gemstone.gemfire.cache.client.internal.Endpoint)entry.getValue(); + String server = loc.getHostName()+"["+loc.getPort()+"]"; + Integer count = (Integer)map.get(server); + if(count==null) { + map.put(server,Integer.valueOf(1)); + } else { + map.put(server,Integer.valueOf(count.intValue()+1)); + } + } + } + return map; + } + + public static Map getConnectedIncomingGateways() { + Map connectedIncomingGateways = null; + ClientHealthMonitor chMon = ClientHealthMonitor.getInstance(); + if (chMon == null) { + connectedIncomingGateways = new HashMap(); + } else { + connectedIncomingGateways = chMon.getConnectedIncomingGateways(); + } + return connectedIncomingGateways; + } + + + + /** + * Notifies registered listeners that a bridge member has joined. The new + * member may be a bridge client connecting to this process or a bridge + * server that this process has just connected to. + * + * @param member the <code>DistributedMember</code> + * @param client true if the member is a bridge client; false if bridge server + */ + public static void notifyJoined(final DistributedMember member, final boolean client) { + startMonitoring(); + ThreadPoolExecutor queuedExecutor = executor; + if (queuedExecutor == null) { + return; + } + + final ClientMembershipEvent event = + new InternalBridgeMembershipEvent(member, client); + if (forceSynchronous) { + doNotifyClientMembershipListener(member, client, event,EventType.CLIENT_JOINED); + } + else { + try { + queuedExecutor.execute(new Runnable() { + public void run() { + doNotifyClientMembershipListener(member, client, event,EventType.CLIENT_JOINED); + } + }); + } + catch (RejectedExecutionException e) { + // executor must have been shutdown + } + } + } + + + + /** + * Notifies registered listeners that a bridge member has left. The departed + * member may be a bridge client previously connected to this process or a + * bridge server that this process was connected to. + * + * @param member the <code>DistributedMember</code> + * @param client true if the member is a bridge client; false if bridge server + */ + public static void notifyLeft(final DistributedMember member, final boolean client) { + startMonitoring(); + ThreadPoolExecutor queuedExecutor = executor; + if (queuedExecutor == null) { + return; + } + + + final ClientMembershipEvent event = + new InternalBridgeMembershipEvent(member, client); + if (forceSynchronous) { + doNotifyClientMembershipListener(member, client, event,EventType.CLIENT_LEFT); + } + else { + try { + queuedExecutor.execute(new Runnable() { + public void run() { + doNotifyClientMembershipListener(member, client, event,EventType.CLIENT_LEFT); + } + }); + } + catch (RejectedExecutionException e) { + // executor must have been shutdown + } + } + } + + + /** + * Notifies registered listeners that a bridge member has crashed. The + * departed member may be a bridge client previously connected to this + * process or a bridge server that this process was connected to. + * + * @param member the <code>DistributedMember</code> + * @param client true if the member is a bridge client; false if bridge server + */ + public static void notifyCrashed(final DistributedMember member, final boolean client) { + ThreadPoolExecutor queuedExecutor = executor; + if (queuedExecutor == null) { + return; + } + + final ClientMembershipEvent event = + new InternalBridgeMembershipEvent(member, client); + if (forceSynchronous) { + doNotifyClientMembershipListener(member, client, event,EventType.CLIENT_CRASHED); + } + else { + + try { + queuedExecutor.execute(new Runnable() { + public void run() { + doNotifyClientMembershipListener(member, client, event,EventType.CLIENT_CRASHED); + } + }); + } + catch (RejectedExecutionException e) { + // executor must have been shutdown + } + } + } + + private static void doNotifyClientMembershipListener(DistributedMember member, boolean client, + ClientMembershipEvent clientMembershipEvent, EventType eventType) { + + for (Iterator<ClientMembershipListener> iter = clientMembershipListeners.iterator(); iter.hasNext();) { + + ClientMembershipListener listener = iter.next(); + try { + if (eventType.equals(EventType.CLIENT_JOINED)) { + listener.memberJoined(clientMembershipEvent); + } else if (eventType.equals(EventType.CLIENT_LEFT)) { + listener.memberLeft(clientMembershipEvent); + } else { + listener.memberCrashed(clientMembershipEvent); + } + } catch (CancelException e) { + // this can be thrown by a bridge server when the system is shutting + // down + return; + } catch (VirtualMachineError e) { + SystemFailure.initiateFailure(e); + throw e; + } catch (Throwable t) { + SystemFailure.checkFailure(); + logger.warn(LocalizedMessage.create(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION), t); + } + } + } + +// /** +// * Returns true if there are any registered +// * <code>BridgeMembershipListener</code>s. +// */ +// private static boolean hasBridgeMembershipListeners() { +// synchronized (membershipLock) { +// return !membershipListeners.isEmpty(); +// } +// } + + protected static void addInternalDistributedSystem(InternalDistributedSystem s) { + synchronized(systems) { + s.addDisconnectListener( + new InternalDistributedSystem.DisconnectListener() { + @Override + public String toString() { + return "Disconnect listener for InternalBridgeMembership"; + } + + public void onDisconnect(InternalDistributedSystem ss) { + removeInternalDistributedSystem(ss); + } + }); + systems.add(s); + // make sure executor is alive + ensureExecutorIsRunning(); // optimized to do nothing if already running + } + } + + protected static void removeInternalDistributedSystem(InternalDistributedSystem sys) { + synchronized(systems) { + systems.remove(sys); + if (systems.isEmpty()) { + // clean up executor +/* +Object[] queueElementsBefore = new Object[executorQueue.size()]; +queueElementsBefore = executorQueue.toArray(queueElementsBefore); +System.out.println("Before shut down, the executor's queue contains the following " + queueElementsBefore.length + " elements"); +for (int i=0; i<queueElementsBefore.length; i++) { + System.out.println("\t" + queueElementsBefore[i]); +} +*/ + if (executor != null) { + executor.shutdown(); + } +/* +Object[] queueElementsAfter = new Object[executorQueue.size()]; +queueElementsAfter = executorQueue.toArray(queueElementsAfter); +System.out.println("After shut down, the executor's queue contains the following " + queueElementsAfter.length + " elements"); +for (int i=0; i<queueElementsAfter.length; i++) { + System.out.println("\t" + queueElementsAfter[i]); +} +*/ + // deadcoded this clear to fix bug 35675 - clearing removed the shutdown token from the queue! + // executorQueue.clear(); + executor = null; + } + } + } + + /** + * @guarded.By {@link #systems} + */ + private static void ensureExecutorIsRunning() { + // protected by calling method synchronized on systems + if (executor == null) { + final ThreadGroup group = threadGroup; + ThreadFactory tf = new ThreadFactory() { + public Thread newThread(Runnable command) { + Thread thread = + new Thread(group, command, "BridgeMembership Event Invoker"); + thread.setDaemon(true); + return thread; + } + }; + LinkedBlockingQueue q = new LinkedBlockingQueue(); + executor = new ThreadPoolExecutor(1, 1/*max unused*/, + 15, TimeUnit.SECONDS, q, tf); + } + } + + /** + * Internal implementation of BridgeMembershipEvent. + */ + protected static class InternalBridgeMembershipEvent + implements ClientMembershipEvent { + + private final DistributedMember member; + private final boolean client; + + /** Constructs new instance of event */ + protected InternalBridgeMembershipEvent(DistributedMember member, boolean client) { + this.member = member; + this.client = client; + } + + public DistributedMember getMember() { + return this.member; + } + + public String getMemberId() { + return this.member == null ? "unknown" : this.member.getId(); + } + + public boolean isClient() { + return this.client; + } + + @Override // GemStoneAddition + public String toString() { + final StringBuffer sb = new StringBuffer("[BridgeMembershipEvent: "); + sb.append("member=").append(this.member); + sb.append(", isClient=").append(this.client); + sb.append("]"); + return sb.toString(); + } + } + + /** If set to true for testing then notification will be synchronous */ + private static boolean forceSynchronous = false; + /** Set to true if synchronous notification is needed for testing */ + public static void setForceSynchronous(boolean value) { + forceSynchronous = value; + } + + private static enum EventType{ + CLIENT_JOINED, + CLIENT_LEFT, + CLIENT_CRASHED + } +} +
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1e5fc5b4/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java index b307b95..9fe595b 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java @@ -47,7 +47,7 @@ import com.gemstone.gemfire.internal.cache.tier.Acceptor; import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; import com.gemstone.gemfire.internal.cache.tier.ClientHandShake; import com.gemstone.gemfire.internal.cache.tier.Command; -import com.gemstone.gemfire.internal.cache.tier.InternalBridgeMembership; +import com.gemstone.gemfire.internal.cache.tier.InternalClientMembership; import com.gemstone.gemfire.internal.cache.tier.MessageType; import com.gemstone.gemfire.internal.cache.tier.sockets.command.Default; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; @@ -571,7 +571,7 @@ public class ServerConnection implements Runnable { this.crHelper.checkCancelInProgress(null); if (clientJoined && isFiringMembershipEvents()) { // This is a new client. Notify bridge membership and heartbeat monitor. - InternalBridgeMembership.notifyJoined(this.proxyId.getDistributedMember(), + InternalClientMembership.notifyJoined(this.proxyId.getDistributedMember(), true); } @@ -871,9 +871,9 @@ public class ServerConnection implements Runnable { // the heartbeat monitor; other wise just remove the connection. if (clientDeparted && isFiringMembershipEvents()) { if (this.clientDisconnectedCleanly && !forceClientCrashEvent) { - InternalBridgeMembership.notifyLeft(proxyId.getDistributedMember(), true); + InternalClientMembership.notifyLeft(proxyId.getDistributedMember(), true); } else { - InternalBridgeMembership.notifyCrashed(this.proxyId.getDistributedMember(), true); + InternalClientMembership.notifyCrashed(this.proxyId.getDistributedMember(), true); } // The client has departed. Remove this last connection and unregister it. } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1e5fc5b4/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java index a40d5d5..3116573 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/CacheServerBridge.java @@ -35,14 +35,13 @@ import com.gemstone.gemfire.cache.server.CacheServer; import com.gemstone.gemfire.cache.server.ServerLoad; import com.gemstone.gemfire.cache.server.ServerLoadProbe; import com.gemstone.gemfire.cache.server.internal.ServerMetricsImpl; -import com.gemstone.gemfire.cache.util.BridgeMembershipListener; import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion; import com.gemstone.gemfire.internal.admin.remote.ClientHealthStats; import com.gemstone.gemfire.internal.cache.CacheServerImpl; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; -import com.gemstone.gemfire.internal.cache.tier.InternalBridgeMembership; +import com.gemstone.gemfire.internal.cache.tier.InternalClientMembership; import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; @@ -60,6 +59,7 @@ import com.gemstone.gemfire.management.internal.beans.stats.StatsAverageLatency; import com.gemstone.gemfire.management.internal.beans.stats.StatsKey; import com.gemstone.gemfire.management.internal.beans.stats.StatsRate; import com.gemstone.gemfire.management.internal.cli.CliUtil; +import com.gemstone.gemfire.management.membership.ClientMembershipListener; /** * Represents the GemFire CacheServer . Provides data and notifications about @@ -86,7 +86,7 @@ public class CacheServerBridge extends ServerBridge{ private MemberMBeanBridge memberMBeanBridge; - private BridgeMembershipListener membershipListener; + private ClientMembershipListener membershipListener; public static ThreadLocal<Version> clientVersion = new ThreadLocal<Version>(); @@ -661,16 +661,16 @@ public class CacheServerBridge extends ServerBridge{ } public int getNumSubscriptions() { - Map clientProxyMembershipIDMap = InternalBridgeMembership.getClientQueueSizes(); + Map clientProxyMembershipIDMap = InternalClientMembership.getClientQueueSizes(); return clientProxyMembershipIDMap.keySet().size(); } - public void setBridgeMembershipListener( - BridgeMembershipListener membershipListener) { + public void setClientMembershipListener( + ClientMembershipListener membershipListener) { this.membershipListener = membershipListener; } - public BridgeMembershipListener getBridgeMembershipListener() { + public ClientMembershipListener getClientMembershipListener() { return this.membershipListener; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1e5fc5b4/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/ManagementAdapter.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/ManagementAdapter.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/ManagementAdapter.java index 3d6331d..d22480b 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/ManagementAdapter.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/ManagementAdapter.java @@ -31,10 +31,6 @@ import com.gemstone.gemfire.cache.DiskStore; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.cache.util.BridgeMembership; -import com.gemstone.gemfire.cache.util.BridgeMembershipEvent; -import com.gemstone.gemfire.cache.util.BridgeMembershipListener; -import com.gemstone.gemfire.cache.util.BridgeMembershipListenerAdapter; import com.gemstone.gemfire.cache.wan.GatewayReceiver; import com.gemstone.gemfire.cache.wan.GatewaySender; import com.gemstone.gemfire.distributed.Locator; @@ -66,6 +62,10 @@ import com.gemstone.gemfire.management.internal.FederationComponent; import com.gemstone.gemfire.management.internal.MBeanJMXAdapter; import com.gemstone.gemfire.management.internal.ManagementConstants; import com.gemstone.gemfire.management.internal.SystemManagementService; +import com.gemstone.gemfire.management.membership.ClientMembership; +import com.gemstone.gemfire.management.membership.ClientMembershipEvent; +import com.gemstone.gemfire.management.membership.ClientMembershipListener; +import com.gemstone.gemfire.management.membership.ClientMembershipListenerAdapter; import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration; /** @@ -691,11 +691,11 @@ public class ManagementAdapter { ObjectName changedMBeanName = service.registerInternalMBean( (CacheServerMXBean) cacheServerMBean, cacheServerMBeanName); - BridgeMembershipListener managementBridgeListener = new CacheServerMembershipListenerAdapter(cacheServerMBean, + ClientMembershipListener managementClientListener = new CacheServerMembershipListenerAdapter(cacheServerMBean, memberLevelNotifEmitter, changedMBeanName); - BridgeMembership.registerBridgeMembershipListener(managementBridgeListener); + ClientMembership.registerClientMembershipListener(managementClientListener); - cacheServerBridge.setBridgeMembershipListener(managementBridgeListener); + cacheServerBridge.setClientMembershipListener(managementClientListener); service.federate(changedMBeanName, CacheServerMXBean.class, true); @@ -725,11 +725,11 @@ public class ManagementAdapter { CacheServerMBean mbean = (CacheServerMBean) service .getLocalCacheServerMXBean(server.getPort()); - BridgeMembershipListener listener = mbean.getBridge() - .getBridgeMembershipListener(); + ClientMembershipListener listener = mbean.getBridge() + .getClientMembershipListener(); if(listener != null){ - BridgeMembership.unregisterBridgeMembershipListener(listener); + ClientMembership.unregisterClientMembershipListener(listener); } @@ -824,11 +824,11 @@ public class ManagementAdapter { .getLocalCacheServerMXBean(server.getPort()); if (mbean != null) { - BridgeMembershipListener listener = mbean.getBridge() - .getBridgeMembershipListener(); + ClientMembershipListener listener = mbean.getBridge() + .getClientMembershipListener(); if (listener != null) { - BridgeMembership.unregisterBridgeMembershipListener(listener); + ClientMembership.unregisterClientMembershipListener(listener); } } @@ -1057,12 +1057,12 @@ public class ManagementAdapter { /** - * Private class which acts as a BridgeMembershipListener to propagate client + * Private class which acts as a ClientMembershipListener to propagate client * joined/left notifications */ private static class CacheServerMembershipListenerAdapter extends - BridgeMembershipListenerAdapter { + ClientMembershipListenerAdapter { private NotificationBroadcasterSupport serverLevelNotifEmitter; private NotificationBroadcasterSupport memberLevelNotifEmitter; @@ -1078,9 +1078,9 @@ public class ManagementAdapter { /** * Invoked when a client has connected to this process or when this process - * has connected to a BridgeServer. + * has connected to a CacheServer. */ - public void memberJoined(BridgeMembershipEvent event) { + public void memberJoined(ClientMembershipEvent event) { Notification notification = new Notification(JMXNotificationType.CLIENT_JOINED, serverSource, SequenceNumber .next(), System.currentTimeMillis(), ManagementConstants.CLIENT_JOINED_PREFIX + event.getMemberId()); serverLevelNotifEmitter.sendNotification(notification); @@ -1090,9 +1090,9 @@ public class ManagementAdapter { /** * Invoked when a client has gracefully disconnected from this process or - * when this process has gracefully disconnected from a BridgeServer. + * when this process has gracefully disconnected from a CacheServer. */ - public void memberLeft(BridgeMembershipEvent event) { + public void memberLeft(ClientMembershipEvent event) { Notification notification = new Notification(JMXNotificationType.CLIENT_LEFT, serverSource, SequenceNumber .next(), System.currentTimeMillis(), ManagementConstants.CLIENT_LEFT_PREFIX + event.getMemberId()); serverLevelNotifEmitter.sendNotification(notification); @@ -1101,9 +1101,9 @@ public class ManagementAdapter { /** * Invoked when a client has unexpectedly disconnected from this process or - * when this process has unexpectedly disconnected from a BridgeServer. + * when this process has unexpectedly disconnected from a CacheServer. */ - public void memberCrashed(BridgeMembershipEvent event) { + public void memberCrashed(ClientMembershipEvent event) { Notification notification = new Notification(JMXNotificationType.CLIENT_CRASHED, serverSource, SequenceNumber .next(), System.currentTimeMillis(), ManagementConstants.CLIENT_CRASHED_PREFIX + event.getMemberId()); serverLevelNotifEmitter.sendNotification(notification); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1e5fc5b4/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/GetMemberInformationFunction.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/GetMemberInformationFunction.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/GetMemberInformationFunction.java index face825..ff4d945 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/GetMemberInformationFunction.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/GetMemberInformationFunction.java @@ -25,7 +25,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.internal.InternalEntity; import com.gemstone.gemfire.internal.cache.CacheClientStatus; -import com.gemstone.gemfire.internal.cache.tier.InternalBridgeMembership; +import com.gemstone.gemfire.internal.cache.tier.InternalClientMembership; import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; import com.gemstone.gemfire.management.internal.cli.CliUtil; import com.gemstone.gemfire.management.internal.cli.domain.CacheServerInfo; @@ -114,7 +114,7 @@ public class GetMemberInformationFunction extends FunctionAdapter implements Int CacheServerInfo cacheServerInfo = new CacheServerInfo(bindAddress, port, isRunning); memberInfo.addCacheServerInfo(cacheServerInfo); } - Map<ClientProxyMembershipID, CacheClientStatus> allConnectedClients = InternalBridgeMembership + Map<ClientProxyMembershipID, CacheClientStatus> allConnectedClients = InternalClientMembership .getStatusForAllClientsIgnoreSubscriptionStatus(); Iterator<ClientProxyMembershipID> it = allConnectedClients.keySet().iterator(); int numConnections = 0; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1e5fc5b4/gemfire-core/src/main/java/com/gemstone/gemfire/management/membership/ClientMembership.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/membership/ClientMembership.java b/gemfire-core/src/main/java/com/gemstone/gemfire/management/membership/ClientMembership.java index e5369c1..e07f19b 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/membership/ClientMembership.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/membership/ClientMembership.java @@ -7,7 +7,7 @@ */ package com.gemstone.gemfire.management.membership; -import com.gemstone.gemfire.internal.cache.tier.InternalBridgeMembership; +import com.gemstone.gemfire.internal.cache.tier.InternalClientMembership; /** * Provides utility methods for registering and unregistering @@ -29,7 +29,7 @@ public final class ClientMembership { * a ClientMembershipListener to be registered */ public static void registerClientMembershipListener(ClientMembershipListener listener) { - InternalBridgeMembership.registerClientMembershipListener(listener); + InternalClientMembership.registerClientMembershipListener(listener); } /** @@ -40,7 +40,7 @@ public final class ClientMembership { * a ClientMembershipListener to be unregistered */ public static void unregisterClientMembershipListener(ClientMembershipListener listener) { - InternalBridgeMembership.unregisterClientMembershipListener(listener); + InternalClientMembership.unregisterClientMembershipListener(listener); } /** @@ -52,7 +52,7 @@ public final class ClientMembership { * array if no listeners */ public static ClientMembershipListener[] getClientMembershipListeners() { - return InternalBridgeMembership.getClientMembershipListeners(); + return InternalClientMembership.getClientMembershipListeners(); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1e5fc5b4/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java index 8b2cf0b..0e63bff 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/client/internal/AutoConnectionSourceDUnitTest.java @@ -20,12 +20,12 @@ import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.client.NoAvailableServersException; import com.gemstone.gemfire.cache.client.PoolManager; import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.cache.util.BridgeMembership; -import com.gemstone.gemfire.cache.util.BridgeMembershipEvent; -import com.gemstone.gemfire.cache.util.BridgeMembershipListenerAdapter; import com.gemstone.gemfire.distributed.internal.ServerLocation; import com.gemstone.gemfire.internal.AvailablePort; import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.management.membership.ClientMembership; +import com.gemstone.gemfire.management.membership.ClientMembershipEvent; +import com.gemstone.gemfire.management.membership.ClientMembershipListenerAdapter; import dunit.Host; import dunit.SerializableCallable; @@ -282,7 +282,7 @@ public class AutoConnectionSourceDUnitTest extends LocatorTestBase { checkEndpoints(vm2, new int[] {serverPort1}); } - public void testBridgeMembershipListener() throws Exception { + public void testClientMembershipListener() throws Exception { final Host host = Host.getHost(0); VM locatorVM = host.getVM(0); VM bridge1VM = host.getVM(1); @@ -452,7 +452,7 @@ public class AutoConnectionSourceDUnitTest extends LocatorTestBase { vm.invoke(new SerializableRunnable("Add membership listener") { public void run() { MyListener listener = new MyListener(); - BridgeMembership.registerBridgeMembershipListener(listener); + ClientMembership.registerClientMembershipListener(listener); remoteObjects.put(BRIDGE_LISTENER, listener); } }); @@ -541,12 +541,13 @@ public class AutoConnectionSourceDUnitTest extends LocatorTestBase { }); } - public static class MyListener extends BridgeMembershipListenerAdapter implements Serializable { + public static class MyListener extends ClientMembershipListenerAdapter implements Serializable { protected int crashes = 0; protected int joins = 0; protected int departures= 0; - public synchronized void memberCrashed(BridgeMembershipEvent event) { + @Override + public synchronized void memberCrashed(ClientMembershipEvent event) { crashes++; notifyAll(); } @@ -557,12 +558,14 @@ public class AutoConnectionSourceDUnitTest extends LocatorTestBase { departures = 0; } - public synchronized void memberJoined(BridgeMembershipEvent event) { + @Override + public synchronized void memberJoined(ClientMembershipEvent event) { joins++; notifyAll(); } - public synchronized void memberLeft(BridgeMembershipEvent event) { + @Override + public synchronized void memberLeft(ClientMembershipEvent event) { departures++; notifyAll(); }