http://git-wip-us.apache.org/repos/asf/geode/blob/eb481eaa/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index e79bfbd..4bd4970 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -93,7 +93,6 @@ import org.apache.geode.internal.cache.ClientRegionEventImpl; import org.apache.geode.internal.cache.ClientServerObserver; import org.apache.geode.internal.cache.ClientServerObserverHolder; import org.apache.geode.internal.cache.Conflatable; -import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; @@ -120,29 +119,158 @@ import org.apache.geode.security.AuthenticationFailedException; import org.apache.geode.security.AuthenticationRequiredException; /** - * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections - * to clients requesting notification of updates and notifies them when updates occur. + * Class {@code CacheClientNotifier} works on the server and manages client socket connections to + * clients requesting notification of updates and notifies them when updates occur. * * @since GemFire 3.2 */ -@SuppressWarnings({"synthetic-access", "deprecation"}) public class CacheClientNotifier { private static final Logger logger = LogService.getLogger(); private static volatile CacheClientNotifier ccnSingleton; /** - * Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance. + * The map of known {@code CacheClientProxy} instances. Maps ClientProxyMembershipID to + * CacheClientProxy. Note that the keys in this map are not updated when a durable client + * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the + * CacheClientProxy and then call getProxyID on it. + * <p> + * NOTE: package-private to avoid synthetic accessor + */ + final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ clientProxies = + new ConcurrentHashMap(); + + /** + * The map of {@code CacheClientProxy} instances which are getting initialized. Maps + * ClientProxyMembershipID to CacheClientProxy. + */ + private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ initClientProxies = + new ConcurrentHashMap(); + + private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>(); + + /** + * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use + * a direct reference to cache in CacheClientNotifier code. Instead, you should always use + * {@code getCache()} + */ + private InternalCache cache; // TODO: fix synchronization of cache + + private InternalLogWriter logWriter; + + /** + * The GemFire security {@code LogWriter} + */ + private InternalLogWriter securityLogWriter; + + /** the maximum number of messages that can be enqueued in a client-queue. */ + private final int maximumMessageCount; + + /** + * the time (in seconds) after which a message in the client queue will expire. + */ + private final int messageTimeToLive; + + /** + * A listener which receives notifications about queues that are added or removed + */ + private final ConnectionListener connectionListener; + + private final CacheServerStats acceptorStats; + + /** + * haContainer can hold either the name of the client-messages-region (in case of eviction + * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In + * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value. + */ + private volatile HAContainerWrapper haContainer; + + /** + * The size of the server-to-client communication socket buffers. This can be modified using the + * BridgeServer.SOCKET_BUFFER_SIZE system property. + */ + private static final int socketBufferSize = + Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768); + + /** + * The statistics for this notifier + */ + final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy then + // make private + + /** + * The {@code InterestRegistrationListener} instances registered in this VM. This is used when + * modifying the set of listeners. + */ + private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet(); + + /** + * The {@code InterestRegistrationListener} instances registered in this VM. This is used to + * provide a read-only {@code Set} of listeners. + */ + private final Set readableInterestRegistrationListeners = + Collections.unmodifiableSet(this.writableInterestRegistrationListeners); + + /** + * System property name for indicating how much frequently the "Queue full" message should be + * logged. + */ + private static final String MAX_QUEUE_LOG_FREQUENCY = + DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit"; + + public static final long DEFAULT_LOG_FREQUENCY = 1000; + + private static final String EVENT_ENQUEUE_WAIT_TIME_NAME = + DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME"; + + private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100; + + /** + * System property value denoting the time in milliseconds. Any thread putting an event into a + * subscription queue, which is full, will wait this much time for the queue to make space. It'll + * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See + * #51400. + */ + public static int eventEnqueueWaitTime; // TODO: encapsulate eventEnqueueWaitTime + + /** + * The frequency of logging the "Queue full" message. + */ + private long logFrequency = DEFAULT_LOG_FREQUENCY; + + private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>(); + + private volatile boolean isCompiledQueryCleanupThreadStarted = false; + + private final Object lockIsCompiledQueryCleanupThreadStarted = new Object(); + + private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask + + private final SocketCloser socketCloser; + + private static final long CLIENT_PING_TASK_PERIOD = + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000); + + /** + * package-private to avoid synthetic accessor + */ + static final long CLIENT_PING_TASK_COUNTER = + Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3); + + private final Set blackListedClients = new CopyOnWriteArraySet(); + + /** + * Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance. * - * @param cache The GemFire <code>InternalCache</code> - * @return A <code>CacheClientNotifier</code> instance + * @param cache The GemFire {@code InternalCache} + * @return A {@code CacheClientNotifier} instance */ public static synchronized CacheClientNotifier getInstance(InternalCache cache, CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive, ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) { if (ccnSingleton == null) { ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount, - messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver); + messageTimeToLive, listener, isGatewayReceiver); } if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) { @@ -158,20 +286,72 @@ public class CacheClientNotifier { } /** + * @param cache The GemFire {@code InternalCache} + * @param listener a listener which should receive notifications abouts queues being added or + * removed. + */ + private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats, + int maximumMessageCount, int messageTimeToLive, ConnectionListener listener, + boolean isGatewayReceiver) { + // Set the Cache + setCache(cache); + this.acceptorStats = acceptorStats; + // we only need one thread per client and wait 50ms for close + this.socketCloser = new SocketCloser(1, 50); + + // Set the LogWriter + this.logWriter = (InternalLogWriter) cache.getLogger(); + + this.connectionListener = listener; + + // Set the security LogWriter + this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger(); + + this.maximumMessageCount = maximumMessageCount; + this.messageTimeToLive = messageTimeToLive; + + // Initialize the statistics + StatisticsFactory factory; + if (isGatewayReceiver) { + factory = new DummyStatisticsFactory(); + } else { + factory = getCache().getDistributedSystem(); + } + this.statistics = new CacheClientNotifierStats(factory); + + try { + this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); + if (this.logFrequency <= 0) { + this.logFrequency = DEFAULT_LOG_FREQUENCY; + } + } catch (Exception e) { + this.logFrequency = DEFAULT_LOG_FREQUENCY; + } + + eventEnqueueWaitTime = + Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME); + if (eventEnqueueWaitTime < 0) { + eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; + } + + // Schedule task to periodically ping clients. + scheduleClientPingTask(); + } + + /** * Writes a given message to the output stream * - * @param dos the <code>DataOutputStream</code> to use for writing the message + * @param dos the {@code DataOutputStream} to use for writing the message * @param type a byte representing the message type - * @param p_msg the message to be written; can be null + * @param message the message to be written; can be null */ - private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion) + private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion) throws IOException { - writeMessage(dos, type, p_msg, clientVersion, (byte) 0x00, 0); + writeMessage(dos, type, message, clientVersion, (byte) 0x00, 0); } - private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion, + private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion, byte epType, int qSize) throws IOException { - String msg = p_msg; // write the message type dos.writeByte(type); @@ -181,6 +361,7 @@ public class CacheClientNotifier { // dummy qSize dos.writeInt(qSize); + String msg = message; if (msg == null) { msg = ""; } @@ -188,10 +369,10 @@ public class CacheClientNotifier { if (clientVersion != null && clientVersion.compareTo(Version.GFE_61) >= 0) { // get all the instantiators. Instantiator[] instantiators = InternalInstantiator.getInstantiators(); - HashMap instantiatorMap = new HashMap(); + Map instantiatorMap = new HashMap(); if (instantiators != null && instantiators.length > 0) { for (Instantiator instantiator : instantiators) { - ArrayList instantiatorAttributes = new ArrayList(); + List<String> instantiatorAttributes = new ArrayList<>(); instantiatorAttributes.add(instantiator.getClass().toString().substring(6)); instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6)); instantiatorMap.put(instantiator.getId(), instantiatorAttributes); @@ -201,15 +382,14 @@ public class CacheClientNotifier { // get all the dataserializers. DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers(); - HashMap<Integer, ArrayList<String>> dsToSupportedClasses = - new HashMap<Integer, ArrayList<String>>(); - HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>(); + Map<Integer, List<String>> dsToSupportedClasses = new HashMap<>(); + Map<Integer, String> dataSerializersMap = new HashMap<>(); if (dataSerializers != null && dataSerializers.length > 0) { for (DataSerializer dataSerializer : dataSerializers) { dataSerializersMap.put(dataSerializer.getId(), dataSerializer.getClass().toString().substring(6)); if (clientVersion.compareTo(Version.GFE_6516) >= 0) { - ArrayList<String> supportedClassNames = new ArrayList<String>(); + List<String> supportedClassNames = new ArrayList<>(); for (Class clazz : dataSerializer.getSupportedClasses()) { supportedClassNames.add(clazz.getName()); } @@ -228,7 +408,7 @@ public class CacheClientNotifier { /** * Writes an exception message to the socket * - * @param dos the <code>DataOutputStream</code> to use for writing the message + * @param dos the {@code DataOutputStream} to use for writing the message * @param type a byte representing the exception type * @param ex the exception to be written; should not be null */ @@ -245,7 +425,7 @@ public class CacheClientNotifier { public void registerClient(Socket socket, boolean isPrimary, long acceptorId, boolean notifyBySubscription) throws IOException { // Since no remote ports were specified in the message, wait for them. - long startTime = this._statistics.startTime(); + long startTime = this.statistics.startTime(); DataInputStream dis = new DataInputStream(socket.getInputStream()); DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); @@ -261,7 +441,7 @@ public class CacheClientNotifier { SocketAddress sa = socket.getRemoteSocketAddress(); UnsupportedVersionException uve = e; if (sa != null) { - String sInfo = " Client: " + sa.toString() + "."; + String sInfo = " Client: " + sa + "."; uve = new UnsupportedVersionException(e.getMessage() + sInfo); } logger.warn( @@ -272,8 +452,7 @@ public class CacheClientNotifier { return; } - // Read and ignore the reply code. This is used on the client to server - // handshake. + // Read and ignore the reply code. This is used on the client to server handshake. dis.readByte(); // replyCode if (Version.GFE_57.compareTo(clientVersion) <= 0) { @@ -289,7 +468,7 @@ public class CacheClientNotifier { } } - protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket, + private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket, boolean isPrimary, long startTime, Version clientVersion, long acceptorId, boolean notifyBySubscription) throws IOException { // Read the ports and throw them away. We no longer need them @@ -299,9 +478,6 @@ public class CacheClientNotifier { } // Read the handshake identifier and convert it to a string member id ClientProxyMembershipID proxyID = null; - CacheClientProxy proxy; - AccessControl authzCallback = null; - byte clientConflation = HandShake.CONFLATION_DEFAULT; try { proxyID = ClientProxyMembershipID.readCanonicalized(dis); if (getBlacklistedClient().contains(proxyID)) { @@ -309,13 +485,14 @@ public class CacheClientNotifier { new Exception("This client is blacklisted by server"), clientVersion); return; } - proxy = getClientProxy(proxyID); + CacheClientProxy proxy = getClientProxy(proxyID); DistributedMember member = proxyID.getDistributedMember(); - DistributedSystem system = this.getCache().getDistributedSystem(); + DistributedSystem system = getCache().getDistributedSystem(); Properties sysProps = system.getProperties(); String authenticator = sysProps.getProperty(SECURITY_CLIENT_AUTHENTICATOR); + byte clientConflation; if (clientVersion.compareTo(Version.GFE_603) >= 0) { byte[] overrides = HandShake.extractOverrides(new byte[] {(byte) dis.read()}); clientConflation = overrides[0]; @@ -339,27 +516,23 @@ public class CacheClientNotifier { Properties credentials = HandShake.readCredentials(dis, dos, system); if (credentials != null && proxy != null) { - if (securityLogWriter.fineEnabled()) { - securityLogWriter + if (this.securityLogWriter.fineEnabled()) { + this.securityLogWriter .fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID); } Object subject = HandShake.verifyCredentials(authenticator, credentials, system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member); if (subject instanceof Principal) { Principal principal = (Principal) subject; - if (securityLogWriter.fineEnabled()) { - securityLogWriter + if (this.securityLogWriter.fineEnabled()) { + this.securityLogWriter .fine("CacheClientNotifier: successfully verified credentials for proxyID: " + proxyID + " having principal: " + principal.getName()); } String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP); - if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) { - if (principal == null) { - securityLogWriter.warning( - LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1, - new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID}); - } + AccessControl authzCallback = null; + if (postAuthzFactoryName != null && !postAuthzFactoryName.isEmpty()) { Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName); authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null); authzCallback.init(principal, member, this.getCache()); @@ -374,13 +547,13 @@ public class CacheClientNotifier { LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0 .toLocalizedString(e)); } catch (AuthenticationRequiredException ex) { - securityLogWriter.warning( + this.securityLogWriter.warning( LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, new Object[] {proxyID, ex}); writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion); return; } catch (AuthenticationFailedException ex) { - securityLogWriter.warning( + this.securityLogWriter.warning( LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, new Object[] {proxyID, ex}); writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion); @@ -389,11 +562,10 @@ public class CacheClientNotifier { logger.warn(LocalizedMessage.create( LocalizedStrings.CacheClientNotifier_0_REGISTERCLIENT_EXCEPTION_ENCOUNTERED_IN_REGISTRATION_1, new Object[] {this, e}), e); - IOException io = new IOException( + throw new IOException( LocalizedStrings.CacheClientNotifier_EXCEPTION_OCCURRED_WHILE_TRYING_TO_REGISTER_INTEREST_DUE_TO_0 - .toLocalizedString(e.getMessage())); - io.initCause(e); - throw io; + .toLocalizedString(e.getMessage()), + e); } catch (Exception ex) { logger.warn(LocalizedMessage.create( LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, @@ -402,7 +574,7 @@ public class CacheClientNotifier { return; } - this._statistics.endClientRegistration(startTime); + this.statistics.endClientRegistration(startTime); } /** @@ -410,14 +582,13 @@ public class CacheClientNotifier { * * @param socket The socket over which the server communicates with the client. * @param proxyId The distributed member id of the client being registered - * @param proxy The <code>CacheClientProxy</code> of the given <code>proxyId</code> + * @param proxy The {@code CacheClientProxy} of the given {@code proxyId} * * @return CacheClientProxy for the registered client */ private CacheClientProxy registerClient(Socket socket, ClientProxyMembershipID proxyId, CacheClientProxy proxy, boolean isPrimary, byte clientConflation, Version clientVersion, long acceptorId, boolean notifyBySubscription) throws IOException, CacheException { - CacheClientProxy l_proxy = proxy; // Initialize the socket socket.setTcpNoDelay(true); @@ -431,9 +602,6 @@ public class CacheClientNotifier { } // Determine whether the client is durable or not. - byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT; - String unsuccessfulMsg = null; - boolean successful = true; boolean clientIsDurable = proxyId.isDurable(); if (logger.isDebugEnabled()) { if (clientIsDurable) { @@ -446,8 +614,11 @@ public class CacheClientNotifier { byte epType = 0x00; int qSize = 0; + byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT; + String unsuccessfulMsg = null; + boolean successful = true; if (clientIsDurable) { - if (l_proxy == null) { + if (proxy == null) { if (isTimedOut(proxyId)) { qSize = PoolImpl.PRIMARY_QUEUE_TIMED_OUT; } else { @@ -459,9 +630,9 @@ public class CacheClientNotifier { "CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.", proxyId.getDurableId()); } - l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation, + proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); - successful = this.initializeProxy(l_proxy); + successful = this.initializeProxy(proxy); } else { if (proxy.isPrimary()) { epType = (byte) 2; @@ -470,27 +641,27 @@ public class CacheClientNotifier { } qSize = proxy.getQueueSize(); // A proxy exists for this durable client. It must be reinitialized. - if (l_proxy.isPaused()) { + if (proxy.isPaused()) { if (CacheClientProxy.testHook != null) { CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT"); } - if (l_proxy.lockDrain()) { + if (proxy.lockDrain()) { try { if (logger.isDebugEnabled()) { logger.debug( "CacheClientNotifier: A proxy exists for durable client with id {}. This proxy will be reinitialized: {}", - proxyId.getDurableId(), l_proxy); + proxyId.getDurableId(), proxy); } - this._statistics.incDurableReconnectionCount(); - l_proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout()); - l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation, + this.statistics.incDurableReconnectionCount(); + proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout()); + proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation, clientVersion); - l_proxy.setMarkerEnqueued(true); + proxy.setMarkerEnqueued(true); if (CacheClientProxy.testHook != null) { CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED"); } } finally { - l_proxy.unlockDrain(); + proxy.unlockDrain(); } } else { unsuccessfulMsg = @@ -507,7 +678,7 @@ public class CacheClientNotifier { // client is already using this durable id. unsuccessfulMsg = LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_THE_REQUESTED_DURABLE_CLIENT_HAS_THE_SAME_IDENTIFIER__0__AS_AN_EXISTING_DURABLE_CLIENT__1__DUPLICATE_DURABLE_CLIENTS_ARE_NOT_ALLOWED - .toLocalizedString(new Object[] {proxyId.getDurableId(), proxy}); + .toLocalizedString(proxyId.getDurableId(), proxy); logger.warn(unsuccessfulMsg); // Set the unsuccessful response byte. responseByte = HandShake.REPLY_EXCEPTION_DUPLICATE_DURABLE_CLIENT; @@ -537,18 +708,18 @@ public class CacheClientNotifier { if (toCreateNewProxy) { // Create the new proxy for this non-durable client - l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation, + proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); - successful = this.initializeProxy(l_proxy); + successful = this.initializeProxy(proxy); } } if (!successful) { - l_proxy = null; + proxy = null; responseByte = HandShake.REPLY_REFUSED; unsuccessfulMsg = LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_A_PREVIOUS_CONNECTION_ATTEMPT_FROM_THIS_CLIENT_IS_STILL_BEING_PROCESSED__0 - .toLocalizedString(new Object[] {proxyId}); + .toLocalizedString(proxyId); } // Tell the client that the proxy has been registered using the response @@ -562,10 +733,10 @@ public class CacheClientNotifier { // write the message type, message length and the error message (if any) writeMessage(dos, responseByte, unsuccessfulMsg, clientVersion, epType, qSize); } catch (IOException ioe) {// remove the added proxy if we get IOException. - if (l_proxy != null) { - boolean keepProxy = l_proxy.close(false, false); // do not check for queue, just close it + if (proxy != null) { + boolean keepProxy = proxy.close(false, false); // do not check for queue, just close it if (!keepProxy) { - removeClientProxy(l_proxy); + removeClientProxy(proxy); } } throw ioe; @@ -580,41 +751,39 @@ public class CacheClientNotifier { // will ensure that the response byte is sent to the client before // the marker message. If the client is durable, the message processor // is not started until the clientReady message is received. - if (!clientIsDurable && l_proxy != null - && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) { + if (!clientIsDurable && proxy != null && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) { // The startOrResumeMessageDispatcher tests if the proxy is a primary. // If this is a secondary proxy, the dispatcher is not started. // The false parameter signifies that a marker message has not already been // processed. This will generate and send one. - l_proxy.startOrResumeMessageDispatcher(false); + proxy.startOrResumeMessageDispatcher(false); } if (responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) { if (logger.isDebugEnabled()) { - logger.debug("CacheClientNotifier: Successfully registered {}", l_proxy); + logger.debug("CacheClientNotifier: Successfully registered {}", proxy); } } else { logger.warn(LocalizedMessage.create( LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_UNSUCCESSFULLY_REGISTERED_CLIENT_WITH_IDENTIFIER__0, proxyId)); } - return l_proxy; + return proxy; } - private boolean initializeProxy(CacheClientProxy l_proxy) throws IOException, CacheException { - boolean status = false; - if (!this.isProxyInInitializationMode(l_proxy)) { + private boolean initializeProxy(CacheClientProxy proxy) throws CacheException { + if (!this.isProxyInInitializationMode(proxy)) { if (logger.isDebugEnabled()) { - logger.debug("Initializing proxy: {}", l_proxy); + logger.debug("Initializing proxy: {}", proxy); } try { // Add client proxy to initialization list. This has to be done before // the queue is created so that events can be buffered here for delivery // to the queue once it's initialized (bug #41681 and others) - addClientInitProxy(l_proxy); - l_proxy.initializeMessageDispatcher(); + addClientInitProxy(proxy); + proxy.initializeMessageDispatcher(); // Initialization success. Add to client proxy list. - addClientProxy(l_proxy); + addClientProxy(proxy); return true; } catch (RegionExistsException ree) { if (logger.isDebugEnabled()) { @@ -624,10 +793,10 @@ public class CacheClientNotifier { } // This will return false; } finally { - removeClientInitProxy(l_proxy); + removeClientInitProxy(proxy); } } - return status; + return false; } /** @@ -670,9 +839,9 @@ public class CacheClientNotifier { boolean success = false; CacheClientProxy proxy = getClientProxy(proxyId); if (proxy != null) { - HARegionQueue harq = proxy.getHARegionQueue(); - harq.addDispatchedMessage(new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()), - eid.getSequenceID()); + HARegionQueue haRegionQueue = proxy.getHARegionQueue(); + haRegionQueue.addDispatchedMessage( + new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()), eid.getSequenceID()); success = true; } return success; @@ -690,11 +859,6 @@ public class CacheClientNotifier { } CacheClientProxy proxy = getClientProxy(membershipID); if (proxy != null) { - // Close the port if the proxy represents the client and contains the - // port) - // // If so, remove the port from the client's remote ports - // proxy.removePort(clientPort); - // Set the keepalive flag proxy.setKeepAlive(keepAlive); } } @@ -704,7 +868,7 @@ public class CacheClientNotifier { * * @param memberId Uniquely identifies the client */ - public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) { + void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) { if (logger.isDebugEnabled()) { logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId); } @@ -769,14 +933,16 @@ public class CacheClientNotifier { * notify interested clients of the given cache event using the given update message. The event * should have routing information in it that determines which clients will receive the event. */ - public static void notifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) { + public static void notifyClients(InternalCacheEvent event, + ClientUpdateMessage clientUpdateMessage) { CacheClientNotifier instance = ccnSingleton; if (instance != null) { - instance.singletonNotifyClients(event, cmsg); + instance.singletonNotifyClients(event, clientUpdateMessage); } } - private void singletonNotifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) { + private void singletonNotifyClients(InternalCacheEvent event, + ClientUpdateMessage clientUpdateMessage) { final boolean isDebugEnabled = logger.isDebugEnabled(); final boolean isTraceEnabled = logger.isTraceEnabled(); @@ -796,20 +962,20 @@ public class CacheClientNotifier { return; } - long startTime = this._statistics.startTime(); + long startTime = this.statistics.startTime(); ClientUpdateMessageImpl clientMessage; - if (cmsg == null) { + if (clientUpdateMessage == null) { clientMessage = constructClientMessage(event); } else { - clientMessage = (ClientUpdateMessageImpl) cmsg; + clientMessage = (ClientUpdateMessageImpl) clientUpdateMessage; } if (clientMessage == null) { return; } // Holds the clientIds to which filter message needs to be sent. - Set<ClientProxyMembershipID> filterClients = new HashSet(); + Set<ClientProxyMembershipID> filterClients = new HashSet<>(); // Add CQ info. if (filterInfo.getCQs() != null) { @@ -868,7 +1034,7 @@ public class CacheClientNotifier { } } - Conflatable conflatable = null; + Conflatable conflatable; if (clientMessage instanceof ClientTombstoneMessage) { // bug #46832 - HAEventWrapper deserialization can't handle subclasses @@ -893,7 +1059,7 @@ public class CacheClientNotifier { singletonRouteClientMessage(conflatable, filterClients); - this._statistics.endEvent(startTime); + this.statistics.endEvent(startTime); // Cleanup destroyed events in CQ result cache. // While maintaining the CQ results key caching. the destroy event @@ -915,7 +1081,7 @@ public class CacheClientNotifier { String cqName = regionProfile.getRealCqID(cqID); if (cqName != null) { ServerCQ cq = regionProfile.getCq(cqName); - if (cq != null && e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY))) { + if (cq != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) { cq.removeFromCqResultKeys(entryEvent.getKey(), true); } } @@ -930,28 +1096,15 @@ public class CacheClientNotifier { public static void routeClientMessage(Conflatable clientMessage) { CacheClientNotifier instance = ccnSingleton; if (instance != null) { - instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok - // to - // use - // keySet - // here - // because - // all - // we - // do - // is - // call - // getClientProxy - // with - // these - // keys + // ok to use keySet here because all we do is call getClientProxy with these keys + instance.singletonRouteClientMessage(clientMessage, instance.clientProxies.keySet()); } } /** * this is for server side registration of client queue */ - public static void routeSingleClientMessage(ClientUpdateMessage clientMessage, + static void routeSingleClientMessage(ClientUpdateMessage clientMessage, ClientProxyMembershipID clientProxyMembershipId) { CacheClientNotifier instance = ccnSingleton; if (instance != null) { @@ -963,27 +1116,25 @@ public class CacheClientNotifier { private void singletonRouteClientMessage(Conflatable conflatable, Collection<ClientProxyMembershipID> filterClients) { - this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified - // but no p2p distribution + this.cache.getCancelCriterion().checkCancelInProgress(null); List<CacheClientProxy> deadProxies = null; for (ClientProxyMembershipID clientId : filterClients) { - CacheClientProxy proxy; - proxy = this.getClientProxy(clientId, true); + CacheClientProxy proxy = this.getClientProxy(clientId, true); if (proxy != null) { if (proxy.isAlive() || proxy.isPaused() || proxy.isConnected() || proxy.isDurable()) { proxy.deliverMessage(conflatable); } else { proxy.getStatistics().incMessagesFailedQueued(); if (deadProxies == null) { - deadProxies = new ArrayList<CacheClientProxy>(); + deadProxies = new ArrayList<>(); } deadProxies.add(proxy); } - this.blackListSlowReciever(proxy); + this.blackListSlowReceiver(proxy); } } - checkAndRemoveFromClientMsgsRegion(conflatable); + checkAndRemoveFromClientMessagesRegion(conflatable); // Remove any dead clients from the clients to notify if (deadProxies != null) { closeDeadProxies(deadProxies, false); @@ -994,7 +1145,7 @@ public class CacheClientNotifier { * processes the given collection of durable and non-durable client identifiers, returning a * collection of non-durable identifiers of clients connected to this VM */ - public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) { + Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) { return getProxyIDs(mixedDurableAndNonDurableIDs, false); } @@ -1003,52 +1154,44 @@ public class CacheClientNotifier { * collection of non-durable identifiers of clients connected to this VM. This version can check * for proxies in initialization as well as fully initialized proxies. */ - public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs, + private Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs, boolean proxyInInitMode) { - Set<ClientProxyMembershipID> result = new HashSet(); + Set<ClientProxyMembershipID> result = new HashSet<>(); for (Object id : mixedDurableAndNonDurableIDs) { if (id instanceof String) { CacheClientProxy clientProxy = getClientProxy((String) id, true); if (clientProxy != null) { result.add(clientProxy.getProxyID()); } - // else { we don't have a proxy for the given durable ID } } else { // try to canonicalize the ID. CacheClientProxy proxy = getClientProxy((ClientProxyMembershipID) id, true); if (proxy != null) { - // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: found match for " + id + ": " + - // proxy.getProxyID()); result.add(proxy.getProxyID()); - } else { - // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: did not find match for " + id); - // this was causing OOMEs in HARegion initial image processing because - // messages had routing for clients unknown to this server - // result.add((ClientProxyMembershipID)id); } } } return result; } - private void blackListSlowReciever(CacheClientProxy clientProxy) { + private void blackListSlowReceiver(CacheClientProxy clientProxy) { final CacheClientProxy proxy = clientProxy; - if ((proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever()) - && !blackListedClients.contains(proxy.getProxyID())) { + if (proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever() + && !this.blackListedClients.contains(proxy.getProxyID())) { // log alert with client info. logger.warn( LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CLIENT_0_IS_A_SLOW_RECEIVER, new Object[] {proxy.getProxyID()})); addToBlacklistedClient(proxy.getProxyID()); - InternalDistributedSystem ids = - (InternalDistributedSystem) this.getCache().getDistributedSystem(); - final DM dm = ids.getDistributionManager(); + InternalDistributedSystem system = getCache().getInternalDistributedSystem(); + final DM dm = system.getDistributionManager(); + dm.getWaitingThreadPool().execute(new Runnable() { + @Override public void run() { CacheDistributionAdvisor advisor = - ((DistributedRegion) proxy.getHARegionQueue().getRegion()) - .getCacheDistributionAdvisor(); + proxy.getHARegionQueue().getRegion().getCacheDistributionAdvisor(); Set members = advisor.adviseCacheOp(); // Send client blacklist message @@ -1074,25 +1217,24 @@ public class CacheClientNotifier { } /** - * Initializes a <code>ClientUpdateMessage</code> from an operation and event + * Initializes a {@code ClientUpdateMessage} from an operation and event * * @param operation The operation that occurred (e.g. AFTER_CREATE) * @param event The event containing the data to be updated - * @return a <code>ClientUpdateMessage</code> + * @return a {@code ClientUpdateMessage} */ - private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event) - throws Exception { + private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event) { if (!supportsOperation(operation)) { - throw new Exception( + throw new UnsupportedOperationException( LocalizedStrings.CacheClientNotifier_THE_CACHE_CLIENT_NOTIFIER_DOES_NOT_SUPPORT_OPERATIONS_OF_TYPE_0 .toLocalizedString(operation)); } - // String regionName = event.getRegion().getFullPath(); + Object keyOfInterest = null; final EventID eventIdentifier; ClientProxyMembershipID membershipID = null; boolean isNetLoad = false; - Object callbackArgument = null; + Object callbackArgument; byte[] delta = null; VersionTag versionTag = null; @@ -1129,19 +1271,19 @@ public class CacheClientNotifier { } if (isNetLoad) { - clientUpdateMsg.setIsNetLoad(isNetLoad); + clientUpdateMsg.setIsNetLoad(true); } return clientUpdateMsg; } /** - * Returns whether the <code>CacheClientNotifier</code> supports the input operation. + * Returns whether the {@code CacheClientNotifier} supports the input operation. * * @param operation The operation that occurred (e.g. AFTER_CREATE) - * @return whether the <code>CacheClientNotifier</code> supports the input operation + * @return whether the {@code CacheClientNotifier} supports the input operation */ - protected boolean supportsOperation(EnumListenerEvent operation) { + private boolean supportsOperation(EnumListenerEvent operation) { return operation == EnumListenerEvent.AFTER_CREATE || operation == EnumListenerEvent.AFTER_UPDATE || operation == EnumListenerEvent.AFTER_DESTROY @@ -1211,7 +1353,7 @@ public class CacheClientNotifier { int regionDataPolicy) { if (regionDataPolicy == 0) { if (!regionsWithEmptyDataPolicy.containsKey(regionName)) { - regionsWithEmptyDataPolicy.put(regionName, Integer.valueOf(0)); + regionsWithEmptyDataPolicy.put(regionName, 0); } } } @@ -1222,8 +1364,8 @@ public class CacheClientNotifier { * @param regionName The name of the region of interest * @param keyOfInterest The name of the key of interest * @param isClosing Whether the caller is closing - * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested - * in this <code>Region</code> and key + * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in + * this {@code Region} and key */ public void unregisterClientInterest(String regionName, Object keyOfInterest, int interestType, boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) { @@ -1244,8 +1386,8 @@ public class CacheClientNotifier { * * @param regionName The name of the region of interest * @param keysOfInterest The list of keys of interest - * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested - * in this <code>Region</code> and key + * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in + * this {@code Region} and key */ public void registerClientInterest(String regionName, List keysOfInterest, ClientProxyMembershipID membershipID, boolean isDurable, boolean sendUpdatesAsInvalidates, @@ -1278,8 +1420,8 @@ public class CacheClientNotifier { * @param regionName The name of the region of interest * @param keysOfInterest The list of keys of interest * @param isClosing Whether the caller is closing - * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested - * in this <code>Region</code> and key + * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in + * this {@code Region} and key */ public void unregisterClientInterest(String regionName, List keysOfInterest, boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) { @@ -1301,21 +1443,22 @@ public class CacheClientNotifier { * * @since GemFire 5.7 */ - private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) { - if (haContainer == null) { + private void checkAndRemoveFromClientMessagesRegion(Conflatable conflatable) { + if (this.haContainer == null) { return; } + if (conflatable instanceof HAEventWrapper) { HAEventWrapper wrapper = (HAEventWrapper) conflatable; if (!wrapper.getIsRefFromHAContainer()) { - wrapper = (HAEventWrapper) haContainer.getKey(wrapper); + wrapper = (HAEventWrapper) this.haContainer.getKey(wrapper); if (wrapper != null && !wrapper.getPutInProgress()) { synchronized (wrapper) { if (wrapper.getReferenceCount() == 0L) { if (logger.isDebugEnabled()) { logger.debug("Removing event from haContainer: {}", wrapper); } - haContainer.remove(wrapper); + this.haContainer.remove(wrapper); } } } @@ -1328,7 +1471,7 @@ public class CacheClientNotifier { if (logger.isDebugEnabled()) { logger.debug("Removing event from haContainer: {}", wrapper); } - haContainer.remove(wrapper); + this.haContainer.remove(wrapper); } } } @@ -1336,12 +1479,12 @@ public class CacheClientNotifier { } /** - * Returns the <code>CacheClientProxy</code> associated to the membershipID * + * Returns the {@code CacheClientProxy} associated to the membershipID * * - * @return the <code>CacheClientProxy</code> associated to the membershipID + * @return the {@code CacheClientProxy} associated to the membershipID */ public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) { - return (CacheClientProxy) this._clientProxies.get(membershipID); + return (CacheClientProxy) this.clientProxies.get(membershipID); } /** @@ -1352,25 +1495,25 @@ public class CacheClientNotifier { boolean proxyInInitMode) { CacheClientProxy proxy = getClientProxy(membershipID); if (proxyInInitMode && proxy == null) { - proxy = (CacheClientProxy) this._initClientProxies.get(membershipID); + proxy = (CacheClientProxy) this.initClientProxies.get(membershipID); } return proxy; } /** - * Returns the <code>CacheClientProxy</code> associated to the durableClientId + * Returns the {@code CacheClientProxy} associated to the durableClientId * - * @return the <code>CacheClientProxy</code> associated to the durableClientId + * @return the {@code CacheClientProxy} associated to the durableClientId */ public CacheClientProxy getClientProxy(String durableClientId) { return getClientProxy(durableClientId, false); } /** - * Returns the <code>CacheClientProxy</code> associated to the durableClientId. This version of - * the method can check for initializing proxies as well as fully initialized proxies. + * Returns the {@code CacheClientProxy} associated to the durableClientId. This version of the + * method can check for initializing proxies as well as fully initialized proxies. * - * @return the <code>CacheClientProxy</code> associated to the durableClientId + * @return the {@code CacheClientProxy} associated to the durableClientId */ public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) { final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -1379,9 +1522,9 @@ public class CacheClientNotifier { if (isDebugEnabled) { logger.debug("CacheClientNotifier: Determining client for {}", durableClientId); } + CacheClientProxy proxy = null; - for (Iterator i = getClientProxies().iterator(); i.hasNext();) { - CacheClientProxy clientProxy = (CacheClientProxy) i.next(); + for (CacheClientProxy clientProxy : getClientProxies()) { if (isTraceEnabled) { logger.trace("CacheClientNotifier: Checking client {}", clientProxy); } @@ -1394,9 +1537,10 @@ public class CacheClientNotifier { break; } } + if (proxy == null && proxyInInitMode) { - for (Iterator i = this._initClientProxies.values().iterator(); i.hasNext();) { - CacheClientProxy clientProxy = (CacheClientProxy) i.next(); + for (Object clientProxyObject : this.initClientProxies.values()) { + CacheClientProxy clientProxy = (CacheClientProxy) clientProxyObject; if (isTraceEnabled) { logger.trace("CacheClientNotifier: Checking initializing client {}", clientProxy); } @@ -1415,37 +1559,6 @@ public class CacheClientNotifier { } /** - * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID * - * - * @return the <code>CacheClientProxy</code> associated to the same distributed system - */ - public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) { - final boolean isDebugEnabled = logger.isDebugEnabled(); - if (isDebugEnabled) { - logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this, - membershipID); - logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}", - this, getClientProxies().size()); - } - CacheClientProxy proxy = null; - for (Iterator i = getClientProxies().iterator(); i.hasNext();) { - CacheClientProxy clientProxy = (CacheClientProxy) i.next(); - if (isDebugEnabled) { - logger.debug("CacheClientNotifier: Checking client {}", clientProxy); - } - if (clientProxy.isSameDSMember(membershipID)) { - proxy = clientProxy; - if (isDebugEnabled) { - logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy, - membershipID); - } - break; - } - } - return proxy; - } - - /** * It will remove the clients connected to the passed acceptorId. If its the only server, shuts * down this instance. */ @@ -1453,10 +1566,10 @@ public class CacheClientNotifier { final boolean isDebugEnabled = logger.isDebugEnabled(); if (isDebugEnabled) { logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}", - this.getCache().getCacheServers().size()); + getCache().getCacheServers().size()); } - Iterator it = this._clientProxies.values().iterator(); + Iterator it = this.clientProxies.values().iterator(); // Close all the client proxies while (it.hasNext()) { CacheClientProxy proxy = (CacheClientProxy) it.next(); @@ -1478,27 +1591,27 @@ public class CacheClientNotifier { if (noActiveServer() && ccnSingleton != null) { ccnSingleton = null; - if (haContainer != null) { - haContainer.cleanUp(); + if (this.haContainer != null) { + this.haContainer.cleanUp(); if (isDebugEnabled) { - logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName()); + logger.debug("haContainer ({}) is now cleaned up.", this.haContainer.getName()); } } this.clearCompiledQueries(); - blackListedClients.clear(); + this.blackListedClients.clear(); // cancel the ping task this.clientPingTask.cancel(); // Close the statistics - this._statistics.close(); + this.statistics.close(); this.socketCloser.close(); } } private boolean noActiveServer() { - for (CacheServer server : this.getCache().getCacheServers()) { + for (CacheServer server : getCache().getCacheServers()) { if (server.isRunning()) { return false; } @@ -1507,41 +1620,40 @@ public class CacheClientNotifier { } /** - * Adds a new <code>CacheClientProxy</code> to the list of known client proxies + * Adds a new {@code CacheClientProxy} to the list of known client proxies * - * @param proxy The <code>CacheClientProxy</code> to add + * @param proxy The {@code CacheClientProxy} to add */ - protected void addClientProxy(CacheClientProxy proxy) throws IOException { - // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy); + void addClientProxy(CacheClientProxy proxy) { getCache(); // ensure cache reference is up to date so firstclient state is correct - this._clientProxies.put(proxy.getProxyID(), proxy); + this.clientProxies.put(proxy.getProxyID(), proxy); // Remove this proxy from the init proxy list. removeClientInitProxy(proxy); - this._connectionListener.queueAdded(proxy.getProxyID()); - if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) { + this.connectionListener.queueAdded(proxy.getProxyID()); + if (proxy.clientConflation != HandShake.CONFLATION_ON) { // Delta not supported with conflation ON - ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); + ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance(); /* * #41788 - If the client connection init starts while cache/member is shutting down, * ClientHealthMonitor.getInstance() might return null. */ - if (chm != null) { - chm.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal()); + if (clientHealthMonitor != null) { + clientHealthMonitor.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal()); } } this.timedOutDurableClientProxies.remove(proxy.getProxyID()); } - protected void addClientInitProxy(CacheClientProxy proxy) throws IOException { - this._initClientProxies.put(proxy.getProxyID(), proxy); + private void addClientInitProxy(CacheClientProxy proxy) { + this.initClientProxies.put(proxy.getProxyID(), proxy); } - protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException { - this._initClientProxies.remove(proxy.getProxyID()); + private void removeClientInitProxy(CacheClientProxy proxy) { + this.initClientProxies.remove(proxy.getProxyID()); } - protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException { - return this._initClientProxies.containsKey(proxy.getProxyID()); + private boolean isProxyInInitializationMode(CacheClientProxy proxy) { + return this.initClientProxies.containsKey(proxy.getProxyID()); } /** @@ -1552,8 +1664,7 @@ public class CacheClientNotifier { */ public Set getActiveClients() { Set clients = new HashSet(); - for (Iterator iter = getClientProxies().iterator(); iter.hasNext();) { - CacheClientProxy proxy = (CacheClientProxy) iter.next(); + for (CacheClientProxy proxy : getClientProxies()) { if (proxy.hasRegisteredInterested()) { ClientProxyMembershipID proxyID = proxy.getProxyID(); clients.add(proxyID); @@ -1569,8 +1680,8 @@ public class CacheClientNotifier { */ public Map getAllClients() { Map clients = new HashMap(); - for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { - CacheClientProxy proxy = (CacheClientProxy) iter.next(); + for (final Object o : this.clientProxies.values()) { + CacheClientProxy proxy = (CacheClientProxy) o; ClientProxyMembershipID proxyID = proxy.getProxyID(); clients.put(proxyID, new CacheClientStatus(proxyID)); } @@ -1586,8 +1697,8 @@ public class CacheClientNotifier { * @since GemFire 5.6 */ public boolean hasDurableClient(String durableId) { - for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { - CacheClientProxy proxy = (CacheClientProxy) iter.next(); + for (Object clientProxyObject : this.clientProxies.values()) { + CacheClientProxy proxy = (CacheClientProxy) clientProxyObject; ClientProxyMembershipID proxyID = proxy.getProxyID(); if (durableId.equals(proxyID.getDurableId())) { return true; @@ -1605,15 +1716,11 @@ public class CacheClientNotifier { * @since GemFire 5.6 */ public boolean hasPrimaryForDurableClient(String durableId) { - for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { - CacheClientProxy proxy = (CacheClientProxy) iter.next(); + for (Object clientProxyObject : this.clientProxies.values()) { + CacheClientProxy proxy = (CacheClientProxy) clientProxyObject; ClientProxyMembershipID proxyID = proxy.getProxyID(); if (durableId.equals(proxyID.getDurableId())) { - if (proxy.isPrimary()) { - return true; - } else { - return false; - } + return proxy.isPrimary(); } } return false; @@ -1626,9 +1733,9 @@ public class CacheClientNotifier { */ public Map getClientQueueSizes() { Map/* <ClientProxyMembershipID,Integer> */ queueSizes = new HashMap(); - for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { - CacheClientProxy proxy = (CacheClientProxy) iter.next(); - queueSizes.put(proxy.getProxyID(), Integer.valueOf(proxy.getQueueSize())); + for (Object clientProxyObject : this.clientProxies.values()) { + CacheClientProxy proxy = (CacheClientProxy) clientProxyObject; + queueSizes.put(proxy.getProxyID(), proxy.getQueueSize()); } return queueSizes; } @@ -1645,25 +1752,20 @@ public class CacheClientNotifier { public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException { CacheClientProxy proxy = getClientProxy(durableClientId); // close and drain - if (proxy != null) { - return proxy.closeClientCq(clientCQName); - } - return false; + return proxy != null && proxy.closeClientCq(clientCQName); } /** - * Removes an existing <code>CacheClientProxy</code> from the list of known client proxies + * Removes an existing {@code CacheClientProxy} from the list of known client proxies * - * @param proxy The <code>CacheClientProxy</code> to remove + * @param proxy The {@code CacheClientProxy} to remove */ - protected void removeClientProxy(CacheClientProxy proxy) { - // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new - // Exception("stack trace")); + void removeClientProxy(CacheClientProxy proxy) { ClientProxyMembershipID client = proxy.getProxyID(); - this._clientProxies.remove(client); - this._connectionListener.queueRemoved(); - this.getCache().cleanupForClient(this, client); - if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) { + this.clientProxies.remove(client); + this.connectionListener.queueRemoved(); + getCache().cleanupForClient(this, client); + if (proxy.clientConflation != HandShake.CONFLATION_ON) { ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); if (chm != null) { chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal()); @@ -1675,18 +1777,18 @@ public class CacheClientNotifier { this.timedOutDurableClientProxies.add(client); } - public boolean isTimedOut(ClientProxyMembershipID client) { + private boolean isTimedOut(ClientProxyMembershipID client) { return this.timedOutDurableClientProxies.contains(client); } /** - * Returns an unmodifiable Collection of known <code>CacheClientProxy</code> instances. The - * collection is not static so its contents may change. + * Returns an unmodifiable Collection of known {@code CacheClientProxy} instances. The collection + * is not static so its contents may change. * - * @return the collection of known <code>CacheClientProxy</code> instances + * @return the collection of known {@code CacheClientProxy} instances */ public Collection<CacheClientProxy> getClientProxies() { - return Collections.unmodifiableCollection(this._clientProxies.values()); + return Collections.unmodifiableCollection(this.clientProxies.values()); } private void closeAllClientCqs(CacheClientProxy proxy) { @@ -1698,12 +1800,12 @@ public class CacheClientNotifier { logger.debug("CacheClientNotifier: Closing client CQs: {}", proxy); } cqService.closeClientCqs(proxy.getProxyID()); - } catch (CqException e1) { + } catch (CqException e) { logger.warn(LocalizedMessage.create( LocalizedStrings.CacheClientNotifier_UNABLE_TO_CLOSE_CQS_FOR_THE_CLIENT__0, proxy.getProxyID())); if (isDebugEnabled) { - e1.printStackTrace(); + logger.debug(e); } } } @@ -1732,16 +1834,17 @@ public class CacheClientNotifier { } /** - * Close dead <code>CacheClientProxy</code> instances + * Close dead {@code CacheClientProxy} instances * - * @param deadProxies The list of <code>CacheClientProxy</code> instances to close + * @param deadProxies The list of {@code CacheClientProxy} instances to close */ private void closeDeadProxies(List deadProxies, boolean stoppedNormally) { final boolean isDebugEnabled = logger.isDebugEnabled(); - for (Iterator i = deadProxies.iterator(); i.hasNext();) { - CacheClientProxy proxy = (CacheClientProxy) i.next(); - if (isDebugEnabled) + for (Object deadProxy : deadProxies) { + CacheClientProxy proxy = (CacheClientProxy) deadProxy; + if (isDebugEnabled) { logger.debug("CacheClientNotifier: Closing dead client: {}", proxy); + } // Close the proxy boolean keepProxy = false; @@ -1757,8 +1860,7 @@ public class CacheClientNotifier { if (keepProxy) { logger.info(LocalizedMessage.create( LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_KEEPING_PROXY_FOR_DURABLE_CLIENT_NAMED_0_FOR_1_SECONDS_2, - new Object[] {proxy.getDurableId(), Integer.valueOf(proxy.getDurableTimeout()), - proxy})); + new Object[] {proxy.getDurableId(), proxy.getDurableTimeout(), proxy})); } else { closeAllClientCqs(proxy); if (isDebugEnabled) { @@ -1771,10 +1873,10 @@ public class CacheClientNotifier { } /** - * Registers a new <code>InterestRegistrationListener</code> with the set of - * <code>InterestRegistrationListener</code>s. + * Registers a new {@code InterestRegistrationListener} with the set of + * {@code InterestRegistrationListener}s. * - * @param listener The <code>InterestRegistrationListener</code> to register + * @param listener The {@code InterestRegistrationListener} to register * * @since GemFire 5.8Beta */ @@ -1783,10 +1885,10 @@ public class CacheClientNotifier { } /** - * Unregisters an existing <code>InterestRegistrationListener</code> from the set of - * <code>InterestRegistrationListener</code>s. + * Unregisters an existing {@code InterestRegistrationListener} from the set of + * {@code InterestRegistrationListener}s. * - * @param listener The <code>InterestRegistrationListener</code> to unregister + * @param listener The {@code InterestRegistrationListener} to unregister * * @since GemFire 5.8Beta */ @@ -1795,11 +1897,11 @@ public class CacheClientNotifier { } /** - * Returns a read-only collection of <code>InterestRegistrationListener</code>s registered with - * this notifier. + * Returns a read-only collection of {@code InterestRegistrationListener}s registered with this + * notifier. * - * @return a read-only collection of <code>InterestRegistrationListener</code>s registered with - * this notifier + * @return a read-only collection of {@code InterestRegistrationListener}s registered with this + * notifier * * @since GemFire 5.8Beta */ @@ -1811,17 +1913,17 @@ public class CacheClientNotifier { * * @since GemFire 5.8Beta */ - protected boolean containsInterestRegistrationListeners() { + boolean containsInterestRegistrationListeners() { return !this.writableInterestRegistrationListeners.isEmpty(); } /** - * * @since GemFire 5.8Beta */ - protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) { - for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) { - InterestRegistrationListener listener = (InterestRegistrationListener) i.next(); + void notifyInterestRegistrationListeners(InterestRegistrationEvent event) { + for (Object writableInterestRegistrationListener : this.writableInterestRegistrationListeners) { + InterestRegistrationListener listener = + (InterestRegistrationListener) writableInterestRegistrationListener; if (event.isRegister()) { listener.afterRegisterInterest(event); } else { @@ -1836,207 +1938,70 @@ public class CacheClientNotifier { * @return the statistics for the notifier */ public CacheClientNotifierStats getStats() { - return this._statistics; + return this.statistics; } /** - * Returns this <code>CacheClientNotifier</code>'s <code>InternalCache</code>. + * Returns this {@code CacheClientNotifier}'s {@code InternalCache}. * - * @return this <code>CacheClientNotifier</code>'s <code>InternalCache</code> + * @return this {@code CacheClientNotifier}'s {@code InternalCache} */ protected InternalCache getCache() { // TODO:SYNC: looks wrong - if (this._cache != null && this._cache.isClosed()) { + if (this.cache != null && this.cache.isClosed()) { InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { - this._cache = cache; + this.cache = cache; this.logWriter = cache.getInternalLogWriter(); this.securityLogWriter = cache.getSecurityInternalLogWriter(); } } - return this._cache; + return this.cache; } /** - * Returns this <code>CacheClientNotifier</code>'s maximum message count. + * Returns this {@code CacheClientNotifier}'s maximum message count. * - * @return this <code>CacheClientNotifier</code>'s maximum message count + * @return this {@code CacheClientNotifier}'s maximum message count */ protected int getMaximumMessageCount() { return this.maximumMessageCount; } /** - * Returns this <code>CacheClientNotifier</code>'s message time-to-live. + * Returns this {@code CacheClientNotifier}'s message time-to-live. * - * @return this <code>CacheClientNotifier</code>'s message time-to-live + * @return this {@code CacheClientNotifier}'s message time-to-live */ protected int getMessageTimeToLive() { return this.messageTimeToLive; } - protected void handleInterestEvent(InterestRegistrationEvent event) { + void handleInterestEvent(InterestRegistrationEvent event) { LocalRegion region = (LocalRegion) event.getRegion(); region.handleInterestEvent(event); } - /** - * @param cache The GemFire <code>InternalCache</code> - * @param listener a listener which should receive notifications abouts queues being added or - * removed. - */ - private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats, - int maximumMessageCount, int messageTimeToLive, ConnectionListener listener, - List overflowAttributesList, boolean isGatewayReceiver) { - // Set the Cache - setCache(cache); - this.acceptorStats = acceptorStats; - this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms - // for close - - // Set the LogWriter - this.logWriter = (InternalLogWriter) cache.getLogger(); - - this._connectionListener = listener; - - // Set the security LogWriter - this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger(); - - this.maximumMessageCount = maximumMessageCount; - this.messageTimeToLive = messageTimeToLive; - - // Initialize the statistics - StatisticsFactory factory; - if (isGatewayReceiver) { - factory = new DummyStatisticsFactory(); - } else { - factory = this.getCache().getDistributedSystem(); - } - this._statistics = new CacheClientNotifierStats(factory); - - try { - this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); - if (this.logFrequency <= 0) { - this.logFrequency = DEFAULT_LOG_FREQUENCY; - } - } catch (Exception e) { - this.logFrequency = DEFAULT_LOG_FREQUENCY; - } - - eventEnqueueWaitTime = - Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME); - if (eventEnqueueWaitTime < 0) { - eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; - } - - // Schedule task to periodically ping clients. - scheduleClientPingTask(); - } - - /** - * this message is used to send interest registration to another server. Since interest - * registration performs a state-flush operation this message must not transmitted on an ordered - * socket - */ - public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage - implements MessageWithReply { - ClientProxyMembershipID clientId; - ClientInterestMessageImpl clientMessage; - int processorId; - - ServerInterestRegistrationMessage(ClientProxyMembershipID clientID, - ClientInterestMessageImpl msg) { - this.clientId = clientID; - this.clientMessage = msg; - } - - public ServerInterestRegistrationMessage() {} - - static void sendInterestChange(DM dm, ClientProxyMembershipID clientID, - ClientInterestMessageImpl msg) { - ServerInterestRegistrationMessage smsg = new ServerInterestRegistrationMessage(clientID, msg); - Set recipients = dm.getOtherDistributionManagerIds(); - smsg.setRecipients(recipients); - ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients); - smsg.processorId = rp.getProcessorId(); - dm.putOutgoing(smsg); - try { - rp.waitForReplies(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } - - @Override - protected void process(DistributionManager dm) { - // Get the proxy for the proxy id - try { - CacheClientNotifier ccn = CacheClientNotifier.getInstance(); - if (ccn != null) { - CacheClientProxy proxy = ccn.getClientProxy(clientId); - // If this VM contains a proxy for the requested proxy id, forward the - // message on to the proxy for processing - if (proxy != null) { - proxy.processInterestMessage(this.clientMessage); - } - } - } finally { - ReplyMessage reply = new ReplyMessage(); - reply.setProcessorId(this.processorId); - reply.setRecipient(getSender()); - try { - dm.putOutgoing(reply); - } catch (CancelException e) { - // can't send a reply, so ignore the exception - } - } - } - - public int getDSFID() { - return SERVER_INTEREST_REGISTRATION_MESSAGE; - } - - @Override - public void toData(DataOutput out) throws IOException { - super.toData(out); - out.writeInt(this.processorId); - InternalDataSerializer.invokeToData(this.clientId, out); - InternalDataSerializer.invokeToData(this.clientMessage, out); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - super.fromData(in); - this.processorId = in.readInt(); - this.clientId = new ClientProxyMembershipID(); - InternalDataSerializer.invokeFromData(this.clientId, in); - this.clientMessage = new ClientInterestMessageImpl(); - InternalDataSerializer.invokeFromData(this.clientMessage, in); - } - } - - protected void deliverInterestChange(ClientProxyMembershipID proxyID, - ClientInterestMessageImpl message) { - DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem()) - .getDistributionManager(); + void deliverInterestChange(ClientProxyMembershipID proxyID, ClientInterestMessageImpl message) { + DM dm = getCache().getInternalDistributedSystem().getDistributionManager(); ServerInterestRegistrationMessage.sendInterestChange(dm, proxyID, message); } - public CacheServerStats getAcceptorStats() { + CacheServerStats getAcceptorStats() { return this.acceptorStats; } - public SocketCloser getSocketCloser() { + SocketCloser getSocketCloser() { return this.socketCloser; } public void addCompiledQuery(DefaultQuery query) { if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null) { // Added successfully. - this._statistics.incCompiledQueryCount(1); + this.statistics.incCompiledQueryCount(1); if (logger.isDebugEnabled()) { logger.debug( "Added compiled query into ccn.compliedQueries list. Query: {}. Total compiled queries: {}", - query.getQueryString(), this._statistics.getCompiledQueryCount()); + query.getQueryString(), this.statistics.getCompiledQueryCount()); } // Start the clearIdleCompiledQueries thread. startCompiledQueryCleanupThread(); @@ -2048,13 +2013,13 @@ public class CacheClientNotifier { } private void clearCompiledQueries() { - if (this.compiledQueries.size() > 0) { - this._statistics.incCompiledQueryCount(-(this.compiledQueries.size())); + if (!this.compiledQueries.isEmpty()) { + this.statistics.incCompiledQueryCount(-this.compiledQueries.size()); this.compiledQueries.clear(); if (logger.isDebugEnabled()) { logger.debug( "Removed all compiled queries from ccn.compliedQueries list. Total compiled queries: {}", - this._statistics.getCompiledQueryCount()); + this.statistics.getCompiledQueryCount()); } } } @@ -2064,7 +2029,7 @@ public class CacheClientNotifier { * checks for the compiled queries that are not used and removes them. */ private void startCompiledQueryCleanupThread() { - if (isCompiledQueryCleanupThreadStarted) { + if (this.isCompiledQueryCleanupThreadStarted) { return; } @@ -2082,11 +2047,11 @@ public class CacheClientNotifier { } else { if (compiledQueries.remove(e.getKey()) != null) { // If successfully removed decrement the counter. - _statistics.incCompiledQueryCount(-1); + statistics.incCompiledQueryCount(-1); if (isDebugEnabled) { logger.debug("Removed compiled query from ccn.compliedQueries list. Query: " + q.getQueryString() + ". Total compiled queries are : " - + _statistics.getCompiledQueryCount()); + + statistics.getCompiledQueryCount()); } } } @@ -2094,23 +2059,23 @@ public class CacheClientNotifier { } }; - synchronized (lockIsCompiledQueryCleanupThreadStarted) { - if (!isCompiledQueryCleanupThreadStarted) { + synchronized (this.lockIsCompiledQueryCleanupThreadStarted) { + if (!this.isCompiledQueryCleanupThreadStarted) { long period = DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME > 0 ? DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME : DefaultQuery.COMPILED_QUERY_CLEAR_TIME; - _cache.getCCPTimer().scheduleAtFixedRate(task, period, period); + this.cache.getCCPTimer().scheduleAtFixedRate(task, period, period); } - isCompiledQueryCleanupThreadStarted = true; + this.isCompiledQueryCleanupThreadStarted = true; } } - protected void scheduleClientPingTask() { + void scheduleClientPingTask() { this.clientPingTask = new SystemTimer.SystemTimerTask() { @Override public void run2() { // If there are no proxies, return - if (CacheClientNotifier.this._clientProxies.isEmpty()) { + if (clientProxies.isEmpty()) { return; } @@ -2145,144 +2110,10 @@ public class CacheClientNotifier { if (logger.isDebugEnabled()) { logger.debug("Scheduling client ping task with period={} ms", CLIENT_PING_TASK_PERIOD); } - CacheClientNotifier.this._cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask, + CacheClientNotifier.this.cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask, CLIENT_PING_TASK_PERIOD, CLIENT_PING_TASK_PERIOD); } - /** - * A string representing all hosts used for delivery purposes. - */ - protected static final String ALL_HOSTS = "ALL_HOSTS"; - - /** - * An int representing all ports used for delivery purposes. - */ - protected static final int ALL_PORTS = -1; - - /** - * The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to - * CacheClientProxy. Note that the keys in this map are not updated when a durable client - * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the - * CacheClientProxy and then call getProxyID on it. - */ - private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _clientProxies = - new ConcurrentHashMap(); - - /** - * The map of <code>CacheClientProxy</code> instances which are getting initialized. Maps - * ClientProxyMembershipID to CacheClientProxy. - */ - private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _initClientProxies = - new ConcurrentHashMap(); - - private final HashSet<ClientProxyMembershipID> timedOutDurableClientProxies = - new HashSet<ClientProxyMembershipID>(); - - /** - * The GemFire <code>InternalCache</code>. Note that since this is a singleton class you should - * not use a direct reference to _cache in CacheClientNotifier code. Instead, you should always - * use <code>getCache()</code> - */ - private InternalCache _cache; - - private InternalLogWriter logWriter; - - /** - * The GemFire security <code>LogWriter</code> - */ - private InternalLogWriter securityLogWriter; - - /** the maximum number of messages that can be enqueued in a client-queue. */ - private int maximumMessageCount; - - /** - * the time (in seconds) after which a message in the client queue will expire. - */ - private int messageTimeToLive; - - /** - * A listener which receives notifications about queues that are added or removed - */ - private ConnectionListener _connectionListener; - - private CacheServerStats acceptorStats; - - /** - * haContainer can hold either the name of the client-messages-region (in case of eviction - * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In - * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value. - */ - private volatile HAContainerWrapper haContainer; - - /** - * The size of the server-to-client communication socket buffers. This can be modified using the - * BridgeServer.SOCKET_BUFFER_SIZE system property. - */ - static final private int socketBufferSize = - Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue(); - - /** - * The statistics for this notifier - */ - protected final CacheClientNotifierStats _statistics; - - /** - * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used - * when modifying the set of listeners. - */ - private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet(); - - /** - * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used to - * provide a read-only <code>Set</code> of listeners. - */ - private final Set readableInterestRegistrationListeners = - Collections.unmodifiableSet(writableInterestRegistrationListeners); - - /** - * System property name for indicating how much frequently the "Queue full" message should be - * logged. - */ - public static final String MAX_QUEUE_LOG_FREQUENCY = - DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit"; - - public static final long DEFAULT_LOG_FREQUENCY = 1000; - - public static final String EVENT_ENQUEUE_WAIT_TIME_NAME = - DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME"; - - public static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100; - - /** - * System property value denoting the time in milliseconds. Any thread putting an event into a - * subscription queue, which is full, will wait this much time for the queue to make space. It'll - * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See - * #51400. - */ - public static int eventEnqueueWaitTime; - - /** - * The frequency of logging the "Queue full" message. - */ - private long logFrequency = DEFAULT_LOG_FREQUENCY; - - private final ConcurrentHashMap<String, DefaultQuery> compiledQueries = - new ConcurrentHashMap<String, DefaultQuery>(); - - private volatile boolean isCompiledQueryCleanupThreadStarted = false; - - private final Object lockIsCompiledQueryCleanupThreadStarted = new Object(); - - private SystemTimer.SystemTimerTask clientPingTask; - - private final SocketCloser socketCloser; - - private static final long CLIENT_PING_TASK_PERIOD = - Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000); - - private static final long CLIENT_PING_TASK_COUNTER = - Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3); - public long getLogFrequency() { return this.logFrequency; } @@ -2291,64 +2122,153 @@ public class CacheClientNotifier { * @return the haContainer */ public Map getHaContainer() { - return haContainer; + return this.haContainer; } - public void initHaContainer(List overflowAttributesList) { + private void initHaContainer(List overflowAttributesList) { // lazily initialize haContainer in case this CCN instance was created by a gateway receiver if (overflowAttributesList != null && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList.get(0))) { - haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR - + CacheServerImpl.clientMessagesRegion(_cache, (String) overflowAttributesList.get(0), - ((Integer) overflowAttributesList.get(1)).intValue(), - ((Integer) overflowAttributesList.get(2)).intValue(), + this.haContainer = new HAContainerRegion(this.cache.getRegion(Region.SEPARATOR + + CacheServerImpl.clientMessagesRegion(this.cache, (String) overflowAttributesList.get(0), + (Integer) overflowAttributesList.get(1), (Integer) overflowAttributesList.get(2), (String) overflowAttributesList.get(3), (Boolean) overflowAttributesList.get(4)))); } else { - haContainer = new HAContainerMap(new ConcurrentHashMap()); + this.haContainer = new HAContainerMap(new ConcurrentHashMap()); } - assert haContainer != null; + assert this.haContainer != null; if (logger.isDebugEnabled()) { - logger.debug("ha container ({}) has been created.", haContainer.getName()); + logger.debug("ha container ({}) has been created.", this.haContainer.getName()); } } - private final Set blackListedClients = new CopyOnWriteArraySet(); - - public void addToBlacklistedClient(ClientProxyMembershipID proxyID) { - blackListedClients.add(proxyID); + void addToBlacklistedClient(ClientProxyMembershipID proxyID) { + this.blackListedClients.add(proxyID); // ensure that cache and distributed system state are current and open - this.getCache(); + getCache(); new ScheduledThreadPoolExecutor(1).schedule(new ExpireBlackListTask(proxyID), 120, TimeUnit.SECONDS); } - public Set getBlacklistedClient() { - return blackListedClients; + Set getBlacklistedClient() { + return this.blackListedClients; } /** - * @param _cache the _cache to set + * @param cache the cache to set */ - private void setCache(InternalCache _cache) { - this._cache = _cache; + private void setCache(InternalCache cache) { + this.cache = cache; } + /** + * Non-static inner class ExpireBlackListTask + */ private class ExpireBlackListTask extends PoolTask { - private ClientProxyMembershipID proxyID; + private final ClientProxyMembershipID proxyID; - public ExpireBlackListTask(ClientProxyMembershipID proxyID) { + ExpireBlackListTask(ClientProxyMembershipID proxyID) { this.proxyID = proxyID; } @Override public void run2() { - if (blackListedClients.remove(proxyID)) { + if (blackListedClients.remove(this.proxyID)) { if (logger.isDebugEnabled()) { - logger.debug("{} client is no longer blacklisted", proxyID); + logger.debug("{} client is no longer blacklisted", this.proxyID); } } } } + + /** + * Static inner-class ServerInterestRegistrationMessage + * <p> + * this message is used to send interest registration to another server. Since interest + * registration performs a state-flush operation this message must not transmitted on an ordered + * socket + */ + public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage + implements MessageWithReply { + + ClientProxyMembershipID clientId; + ClientInterestMessageImpl clientMessage; + int processorId; + + ServerInterestRegistrationMessage(ClientProxyMembershipID clientID, + ClientInterestMessageImpl msg) { + this.clientId = clientID; + this.clientMessage = msg; + } + + public ServerInterestRegistrationMessage() { + // nothing + } + + static void sendInterestChange(DM dm, ClientProxyMembershipID clientID, + ClientInterestMessageImpl msg) { + ServerInterestRegistrationMessage registrationMessage = + new ServerInterestRegistrationMessage(clientID, msg); + Set recipients = dm.getOtherDistributionManagerIds(); + registrationMessage.setRecipients(recipients); + ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients); + registrationMessage.processorId = rp.getProcessorId(); + dm.putOutgoing(registrationMessage); + try { + rp.waitForReplies(); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + + @Override + protected void process(DistributionManager dm) { + // Get the proxy for the proxy id + try { + CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance(); + if (clientNotifier != null) { + CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId); + // If this VM contains a proxy for the requested proxy id, forward the + // message on to the proxy f
<TRUNCATED>