http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClient.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClient.java b/core/src/flex/messaging/client/FlexClient.java deleted file mode 100644 index e212986..0000000 --- a/core/src/flex/messaging/client/FlexClient.java +++ /dev/null @@ -1,2240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -import flex.messaging.ConnectionAwareSession; -import flex.messaging.FlexContext; -import flex.messaging.FlexSession; -import flex.messaging.FlexSessionListener; -import flex.messaging.HttpFlexSession; -import flex.messaging.MessageClient; -import flex.messaging.MessageClientListener; -import flex.messaging.MessageException; -import flex.messaging.endpoints.Endpoint; -import flex.messaging.log.Log; -import flex.messaging.log.LogCategories; -import flex.messaging.messages.CommandMessage; -import flex.messaging.messages.Message; -import flex.messaging.services.MessageService; -import flex.messaging.util.StringUtils; -import flex.messaging.util.TimeoutAbstractObject; - -import java.security.Principal; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * Represents a Flex client application instance on the server. - */ -public class FlexClient extends TimeoutAbstractObject implements FlexSessionListener, MessageClientListener -{ - //-------------------------------------------------------------------------- - // - // Public Static Constants - // - //-------------------------------------------------------------------------- - - /** - * Log category for FlexClient related messages. - */ - public static final String FLEX_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_FLEXCLIENT; - - /** - * This value is passed to the server in an initial client connect to - * indicate that the client needs a server-assigned FlexClient Id. - * - */ - public static final String NULL_FLEXCLIENT_ID = "nil"; - - //-------------------------------------------------------------------------- - // - // Private Static Constants - // - //-------------------------------------------------------------------------- - - /** - * Error string constants. - */ - private static final int FLEX_CLIENT_INVALIDATED = 10027; - private static final int ENDPOINT_PUSH_HANDLER_ALREADY_REGISTERED = 10033; - - private static final String POLL_WAIT_THREAD_NAME_EXTENSION = "-in-poll-wait"; - - //-------------------------------------------------------------------------- - // - // Private Static Variables - // - //-------------------------------------------------------------------------- - - /** - * List of registered FlexClient created listeners. - */ - private static final CopyOnWriteArrayList<FlexClientListener> createdListeners = new CopyOnWriteArrayList<FlexClientListener>(); - - //-------------------------------------------------------------------------- - // - // Public Static Methods - // - //-------------------------------------------------------------------------- - - /** - * Adds a create listener that will be notified when new FlexClients - * are created. - * - * @see flex.messaging.client.FlexClientListener - * - * @param listener The listener to add. - */ - public static void addClientCreatedListener(FlexClientListener listener) - { - if (listener != null) - createdListeners.addIfAbsent(listener); - } - - /** - * Removes a FlexClient created listener. - * - * @see flex.messaging.client.FlexClientListener - * - * @param listener The listener to remove. - */ - public static void removeClientCreatedListener(FlexClientListener listener) - { - if (listener != null) - createdListeners.remove(listener); - } - - //-------------------------------------------------------------------------- - // - // Constructor - // - //-------------------------------------------------------------------------- - - /** - * - * Constructs a new FlexClient instance. - * - * @param manager The FlexClientManager managing this instance. - */ - public FlexClient(FlexClientManager manager) - { - this(manager, FlexContext.getMessageBroker().createUUID()); - } - - /** - * - * Constructs a new FlexClient instance having the specified Id. - * - * @param manager The FlexClientManager managing this instance. - * @param id The Id for this instance. - */ - public FlexClient(FlexClientManager manager, String id) - { - this.id = id; - flexClientManager = manager; - updateLastUse(); - valid = true; - - if (Log.isDebug()) - Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient created with id '" + this.id + "'."); - } - - //-------------------------------------------------------------------------- - // - // Variables - // - //-------------------------------------------------------------------------- - - /** - * Storage for custom attributes. - */ - private volatile Map<String, Object> attributes; - - /** - * List of registered FlexClient attribute listeners. - */ - private volatile CopyOnWriteArrayList<FlexClientAttributeListener> attributeListeners; - - /** - * List of registered FlexClient destroyed listeners. - */ - private volatile CopyOnWriteArrayList<FlexClientListener> destroyedListeners; - - /** - * The manager for the FlexClient. - */ - final FlexClientManager flexClientManager; - - /** - * The unique Id for the instance. - */ - private final String id; - - /** - * Flag used to break cycles during invalidation. - */ - /* package visibility for FlexClientManager */ volatile boolean invalidating; - - /** - * Instance level lock to sync for state changes. - */ - final Object lock = new Object(); - - /** - * MessageClient subscriptions for this MessageClient. - */ - private volatile CopyOnWriteArrayList<MessageClient> messageClients; - - /** - * Queues of outbound messages to push to the client keyed by endpoint id. - * Map(String endpointId, EndpointQueue queue). - */ - private final Map<String, EndpointQueue> outboundQueues = new ConcurrentHashMap<String, EndpointQueue>(1); - - /** - * EndpointPushHandlers keyed by endpointId that the FlexClient - * can use to push messages to remote clients. - * NOTE: these can't be added to the EndpointQueue data type because the existence of queues depends - * upon client subscription state whereas endpoints that support push will generally set up their push - * handling before any subscriptions have been created. - */ - private Map<String, EndpointPushHandler> endpointPushHandlers; - - /** - * Associated FlexSessions that represent the connections the FlexClient makes to the server. - */ - private final CopyOnWriteArrayList<FlexSession> sessions = new CopyOnWriteArrayList<FlexSession>(); // We always have at least one session. - - /** - * Flag indicating whether the instance is valid; once invalidated this flag is - * set to false. - */ - boolean valid; - - /** - * The principal associated with this client. Only used when perClientAuthentication - * is being used. - */ - private Principal userPrincipal; - - //-------------------------------------------------------------------------- - // - // Public Methods - // - //-------------------------------------------------------------------------- - - /** - * Adds a FlexClient attribute listener that will be notified when an - * attribute is added, removed or changed. If the attribute implements - * FlexClientBindingListener, it will be notified before any - * FlexClientAttributeListeners are notified. - * - * @param listener The listener to add. - */ - public void addClientAttributeListener(FlexClientAttributeListener listener) - { - if (listener != null) - { - checkValid(); - - synchronized (lock) - { - if (attributeListeners == null) - attributeListeners = new CopyOnWriteArrayList<FlexClientAttributeListener>(); - } - - attributeListeners.addIfAbsent(listener); - } - } - - /** - * Adds a destroy listener that will be notified when the FlexClient - * is destroyed. Listeners are notified after all attributes - * have been unbound from the FlexClient and any FlexClientBindingListeners - * and FlexClientAttributeListeners have been notified. - * - * @see flex.messaging.client.FlexClientListener - * - * @param listener The listener to add. - */ - public void addClientDestroyedListener(FlexClientListener listener) - { - if (listener != null) - { - checkValid(); - - synchronized (lock) - { - if (destroyedListeners == null) - destroyedListeners = new CopyOnWriteArrayList<FlexClientListener>(); - } - - destroyedListeners.addIfAbsent(listener); - } - } - - /** - * Returns the attribute bound to the specified name for the FlexClient, or null - * if no attribute is bound under the name. - * - * @param name The name the attribute is bound to. - * @return The attribute bound to the specified name. - */ - public Object getAttribute(String name) - { - synchronized (lock) - { - checkValid(); - - updateLastUse(); - - return (attributes == null) ? null : attributes.get(name); - } - } - - /** - * Returns a snapshot of the names of all attributes bound to the FlexClient. - * - * @return A snapshot of the names of all attributes bound to the FlexClient. - */ - public Enumeration<String> getAttributeNames() - { - synchronized (lock) - { - checkValid(); - - updateLastUse(); - - if (attributes == null) - return Collections.enumeration(Collections.<String>emptyList()); - - // Return a copy so we do not run into concurrent modification problems if - // someone adds to the attributes while iterating through the returned enumeration. - return Collections.enumeration(new ArrayList<String>(attributes.keySet())); - } - } - - /** - * - * Returns the push handler registered with the FlexClient with the supplied - * endpoint id, or null if no push handler was registered with the FlexClient - * for that endpoint. - * - * @return The push handler registered with the FlexClient with the supplied - * endpoint id, or null if no push handler was registered with the FlexClient - * for that endpoint. - */ - public EndpointPushHandler getEndpointPushHandler(String endpointId) - { - synchronized (lock) - { - if (endpointPushHandlers != null && endpointPushHandlers.containsKey(endpointId)) - return endpointPushHandlers.get(endpointId); - return null; - } - } - - /** - * - * Returns the queue processor registered with the FlexClient with the supplied - * endpoint id, or null if no queue processor was registered with the FlexClient - * for that endpoint. - * - * @param endpointId The endpoint id. - * @return The queue processor registered with the FlexClient. - */ - public FlexClientOutboundQueueProcessor getOutboundQueueProcessor(String endpointId) - { - EndpointQueue queue = outboundQueues.get(endpointId); - return (queue != null)? queue.processor : null; - } - - /** - * - * Returns the endpoint queue registered with the FlexClient with the supplied - * endpoint id, or null if no endpoint queue was registered with the FlexClient - * for that endpoint. - * - * @param endpointId The endpoint id. - * @return The endpoint queue registered with the FlexClient. - */ - public EndpointQueue getEndpointQueue(String endpointId) - { - return outboundQueues.get(endpointId); - } - - /** - * Override {@link flex.messaging.util.TimeoutAbstractObject#getLastUse()} to make timeout - * dependent upon FlexClient inactivity but also upon the presence of an active push-enabled session, - * async or waited poll, or registered endpoint push handler (all of which indicate that a client has - * an active, open connection to the server). - * - * @return The 'last use' timestamp for the FlexClient, which may be the current system time if the FlexClient - * has been idle but an open connection from the client to the server exists. - */ - @Override - public long getLastUse() - { - synchronized (lock) - { - long currentLastUse = super.getLastUse(); - long idleTime = System.currentTimeMillis() - currentLastUse; - if (idleTime < flexClientManager.getFlexClientTimeoutMillis()) - return currentLastUse; // Not timed out; this will trigger the timeout to be rescheduled. - - // Check for async long-polls or endpoint streaming connections, if found, keep alive. - if (!outboundQueues.isEmpty()) - { - for (EndpointQueue queue : outboundQueues.values()) - { - if (queue.asyncPoll != null) - return System.currentTimeMillis(); - - if (endpointPushHandlers != null && endpointPushHandlers.containsKey(queue.endpointId)) - return System.currentTimeMillis(); - } - } - - // Check for connected sessions, or a session holding a (non-async) long poll and if found, keep alive. - for (FlexSession session : sessions) - { - if (session instanceof ConnectionAwareSession) - { - if (((ConnectionAwareSession)session).isConnected()) - return System.currentTimeMillis(); - } - // Otherwise, check for a long-poll. - if (session.waitMonitor != null) - { - for (EndpointQueue queue : session.waitMonitor.values()) - { - if (queue.flexClient.equals(this)) - return System.currentTimeMillis(); - } - } - } - return currentLastUse; // Allow the FlexClient to timeout. - } - } - - /** - * Returns the attribute bound to the specified name for the current FlexSession - * associated with the FlexClient. If the attribute does not exist in the current - * FlexSession, this method iterates through all the other FlexSessions associated with - * the FlexClient and either returns the attribute bound, or null if no attribute is bound - * under the name. - * - * @param name The name the attribute is bound to. - * @return The attribute bound to the specified name. - */ - public Object getSessionAttribute(String name) - { - Object attributeValue = getSessionAttributeInCurrentSession(name); - if (attributeValue != null) - return attributeValue; - - return getSessionAttributeInOtherSessions(name); - } - - /** - * Returns a snapshot of the names of all attributes bound to all the FlexSessions - * associated with the FlexClient. - * - * @return A snapshot of the names of all attributes bound to all the FlexSessions - * associated with the FlexClient. - */ - public Enumeration<String> getSessionAttributeNames() - { - Set<String> attributeNames = new HashSet<String>(); - for (FlexSession session : sessions) - attributeNames.addAll(getSessionAttributeNames(session)); - return Collections.enumeration(attributeNames); - } - - /** - * - * - * Returns the principal associated with this client. If the client has not - * authenticated the principal will be null. Should only be called from FlexContext - * and only if perClientAuthentication is used. Not available to users. - * - * @return The principal associated with the session. - */ - public Principal getUserPrincipal() - { - synchronized (lock) - { - checkValid(); - return userPrincipal; - } - } - - /** - * - * - * Should only be called from FlexContext and only if perClientAuthentication is used. - * Not available to users. - * - * @param userPrincipal The principal to associate with the session. - */ - public void setUserPrincipal(Principal userPrincipal) - { - synchronized (lock) - { - checkValid(); - this.userPrincipal = userPrincipal; - } - } - - /** - * Invalidates the FlexClient. - */ - public void invalidate() - { - synchronized (lock) - { - if (!valid || invalidating) - return; // Already shutting down. - - invalidating = true; // This thread gets to shut the FlexClient down. - flexClientManager.removeFlexClient(this); - cancelTimeout(); - } - - // Unregister from all FlexSessions. - if (!sessions.isEmpty()) - { - for (FlexSession session : sessions) - unregisterFlexSession(session); - } - - // Invalidate associated MessageClient subscriptions. - if (messageClients != null && !messageClients.isEmpty()) - { - for (MessageClient messageClient : messageClients) - { - messageClient.removeMessageClientDestroyedListener(this); - messageClient.invalidate(); - } - messageClients.clear(); - } - - // Notify destroy listeners that we're shutting the FlexClient down. - if (destroyedListeners != null && !destroyedListeners.isEmpty()) - { - for (FlexClientListener destroyListener : destroyedListeners) - { - destroyListener.clientDestroyed(this); - } - destroyedListeners.clear(); - } - - // Unbind all attributes. - if (attributes != null && !attributes.isEmpty()) - { - Set<String> keySet = attributes.keySet(); - String[] keys = keySet.toArray(new String[keySet.size()]); - for (String key : keys) - removeAttribute(key); - } - - // Close any registered push handlers. - if (endpointPushHandlers != null && !endpointPushHandlers.isEmpty()) - { - for (EndpointPushHandler handler : endpointPushHandlers.values()) - { - handler.close(true /* notify Channel of disconnect */); - } - endpointPushHandlers = null; - } - - synchronized (lock) - { - valid = false; - invalidating = false; - } - - if (Log.isDebug()) - Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient with id '" + this.id + "' has been invalidated."); - } - - /** - * Returns true if the FlexClient is valid; false if it has been invalidated. - * - * @return true if the FlexClient is valid; otherwise false. - */ - public boolean isValid() - { - synchronized (lock) - { - return valid; - } - } - - /** - * Returns a snapshot of the FlexSessions associated with the FlexClient - * when this method is invoked. - * This list is not guaranteed to remain consistent with the actual list - * of active FlexSessions associated with the FlexClient over time. - * - * @return A snapshot of the current list of FlexSessions associated with the FlexClient. - */ - public List<FlexSession> getFlexSessions() - { - List<FlexSession> currentSessions; - synchronized (lock) - { - checkValid(); - - updateLastUse(); - - currentSessions = new ArrayList<FlexSession>(sessions); // Make a copy of the current list to return. - } - return currentSessions; - } - - /** - * Return the session count. - * - * @return The number of sessions associated with this FlexClient. - */ - public int getSessionCount() - { - int sessionCount; - synchronized (lock) - { - sessionCount = (sessions != null) ? sessions.size() : 0; // Make a copy of the current list to return. - } - return sessionCount; - } - - /** - * Return the subscription count. - * - * @return The number of subscriptions associated with this FlexClient. - */ - public int getSubscriptionCount() - { - int count = 0; - synchronized (lock) - { - - if (messageClients != null && !messageClients.isEmpty()) - { - for (MessageClient messageClient : messageClients) - count += messageClient.getSubscriptionCount(); - } - - } - return count; - } - - /** - * Returns the message client registered with the FlexClient with the supplied - * client id, or null if no message client was registered with the FlexClient - * with that client id. - * - * @param clientId The client id. - * @return The message client registered with the FlexClient. - */ - public MessageClient getMessageClient(String clientId) - { - synchronized (lock) - { - if (messageClients != null && !messageClients.isEmpty()) - { - for (MessageClient messageClient : messageClients) - { - if (messageClient.getClientId().equals(clientId)) - return messageClient; - } - } - } - return null; - } - - /** - * Returns a snapshot of the MessageClients (subscriptions) associated with the FlexClient - * when this method is invoked. - * This list is not guaranteed to remain consistent with the actual list - * of active MessageClients associated with the FlexClient over time. - * - * @return A snapshot of the current list of MessageClients associated with the FlexClient. - */ - public List<MessageClient> getMessageClients() - { - List<MessageClient> currentMessageClients; - synchronized (lock) - { - checkValid(); - - updateLastUse(); - - currentMessageClients = (messageClients != null) ? new ArrayList<MessageClient>(messageClients) // Make a copy of the current list to return. - : Collections.<MessageClient>emptyList(); // Return an empty list. - } - return currentMessageClients; - } - - /** - * Returns the unique Id for the FlexClient. - * - * @return The unique Id for the FlexClient. - */ - public String getId() - { - return id; - } - - /** - * - * Implements TimeoutCapable. - * Determine the time, in milliseconds, that this object is allowed to idle - * before having its timeout method invoked. - */ - @Override - public long getTimeoutPeriod() - { - return flexClientManager.getFlexClientTimeoutMillis(); - } - - /** - * - * Implements MessageClientListener. - * Handling created events is a no-op. - * - * @param messageClient The new MessageClient. - */ - public void messageClientCreated(MessageClient messageClient) {} - - /** - * - * Implements MessageClientListener. - * Notification that an associated FlexSession was destroyed. - * - * @param messageClient The MessageClient that was destroyed. - */ - public void messageClientDestroyed(MessageClient messageClient) - { - unregisterMessageClient(messageClient); - } - - /** - * - * Poll for outbound messages for the FlexClient. - * This method is only invoked by internal code while processing a client poll request; it - * is not intended for general public use. - * Poll requests that trigger this method come from client-side polling channels and the request - * is not specific to a single Consumer/MessageClient instance so process any queued messages for - * the specified endpoint across all subscriptions. - * - * @param endpointId The Id of the endpoint that received the poll request. - * @return The flush result including messages to return in the poll response and - * an optional wait time for the next poll/flush. - */ - public FlushResult poll(String endpointId) - { - EndpointQueue queue = null; - synchronized (lock) - { - checkValid(); - - queue = outboundQueues.get(endpointId); - - if (queue != null) - return internalPoll(queue); - } - - if (queue == null) - { - // Otherwise, the client is not subscribed. - throwNotSubscribedException(endpointId); - } - - return null; - } - - /** - * - * Poll for outbound messages for the FlexClient and if no messages are available - * immediately, store a reference to the passed async handler and call back when messages arrive. - * - * @param endpointId The Id of the endpoint that received the poll request. - * @param handler The handler to callback when messages arrive. - * @param waitIntervalMillis The wait interval in milliseconds for the poll to wait for data to arrive - * before returning an empty poll response. - * - * @return A <tt>TimeoutAbstractObject</tt> representing the asynchronous poll, or <code>null</code> - * if the poll request was handled immediately because data was available to return. - */ - public TimeoutAbstractObject pollAsync(String endpointId, AsyncPollHandler handler, long waitIntervalMillis) - { - EndpointQueue queue; - TimeoutAbstractObject asyncPollTask = null; - - synchronized (lock) - { - checkValid(); - - queue = outboundQueues.get(endpointId); - - // If the queue exists and is not empty, flush immediately. - if (queue != null) - { - if (!queue.messages.isEmpty()) - { - handler.asyncPollComplete(internalFlush(queue)); - } - else // Set up an async long-poll. - { - // Avoid monopolizing user agent connections. - FlexSession session = FlexContext.getFlexSession(); - synchronized (session) - { - if (session.asyncPollMap != null) - { - AsyncPollWithTimeout parkedPoll = session.asyncPollMap.get(endpointId); - if (parkedPoll != null) - { - // If the poll is from the same client for this endpoint, treat it as a no-op. - if (parkedPoll.getFlexClient().equals(this)) - { - PollFlushResult result = new PollFlushResult(); - result.setClientProcessingSuppressed(true); - handler.asyncPollComplete(result); - } - else // If the poll is for a different client on the same session, swap their waits. - { - PollFlushResult result = new PollFlushResult(); - result.setAvoidBusyPolling(true); - completeAsyncPoll(parkedPoll, result); - } - } - } - AsyncPollWithTimeout asyncPoll = new AsyncPollWithTimeout(this, session, queue, handler, waitIntervalMillis, endpointId); - synchronized (session) - { - if (session.asyncPollMap == null) - session.asyncPollMap = new HashMap<String, AsyncPollWithTimeout>(); - session.asyncPollMap.put(endpointId, asyncPoll); - } - queue.asyncPoll = asyncPoll; - asyncPollTask = asyncPoll; - } - } - } - } - if (queue == null) - { - // The queue was null; let the client know that there are no active subscriptions. - throwNotSubscribedException(endpointId); - } - return asyncPollTask; - } - - /** - * - * Poll for outbound messages for the FlexClient and if no messages are available - * immediately, put processing into a wait state until messages arrive. - * This method is only invoked by internal code while processing a client poll request; it - * is not intended for general public use. - * Poll requests that trigger this method come from client-side polling channels and the request - * is not specific to a single Consumer/MessageClient instance so process any queued messages for - * the specified endpoint across all subscriptions. - * - * @param endpointId The Id of the endpoint that received the poll request. - * @param session The FlexSession associated with this waitable poll request. - * @param listener The listener to notify before a wait begins and as soon as one completes. - * @param waitIntervalMillis The maximum amount of time to wait for messages in milliseconds. - * @return The flush result including messages to return in the poll response and - * an optional wait time for the next poll/flush. - */ - public FlushResult pollWithWait(String endpointId, FlexSession session, PollWaitListener listener, long waitIntervalMillis) - { - EndpointQueue queue; - synchronized (lock) - { - checkValid(); - - queue = outboundQueues.get(endpointId); - - // If the queue exists and is not empty there's no reason to wait; flush immediately. - if (queue != null) - { - FlushResult flushResult = internalPoll(queue); - if (flushResult != null) - return flushResult; - } - } - - // The queue exists but it was empty; we can try to wait for messages. - if (queue != null) - { - synchronized (session) - { - // Set up the waitMonitor on the session; this is a reference to the queue that the - // current poll request targets and we use it as a wait/notify monitor. - // This also lets us prevent busy polling cycles from a single client. If we already have a waited - // poll request a subsequent poll request is treated as a no-op. - if (session.waitMonitor != null) - { - final EndpointQueue waitingQueue = session.waitMonitor.get(endpointId); - // If the poll is from the same client swf, and the same endpoint, treat it as a no-op poll. - if (waitingQueue != null && waitingQueue.flexClient.equals(this)) - { - PollFlushResult result = new PollFlushResult(); - result.setClientProcessingSuppressed(true); - return result; - } - } - else - { - session.waitMonitor = new HashMap<String, EndpointQueue>(); - } - - // Set the waitMonitor for the session to the queue - // for this poll request before releasing the lock. - session.waitMonitor.put(endpointId, queue); - } - - // Now that the session references the wait monitor this thread will use to wait we can enter - // the wait state. - // -1 wait-interval actually means wait until notified. - waitIntervalMillis = (waitIntervalMillis == -1) ? 0 : waitIntervalMillis; - String threadName = Thread.currentThread().getName(); - try - { - boolean didWait = false; - boolean avoidBusyPolling = false; - synchronized (queue) - { - // If the message queue is still empty, wait for a message to be added before invoking flush. - if (queue.messages.isEmpty()) - { - reportStatusIfDebug("waiting for new messages to arrive"); - - didWait = true; - - // Tag thread name during the wait. - Thread currentThread = Thread.currentThread(); - currentThread.setName(threadName + POLL_WAIT_THREAD_NAME_EXTENSION); - - if (listener != null) - listener.waitStart(queue); - - queue.waitPoll = true; // Mark the queue as waiting. - - queue.wait(waitIntervalMillis); - - queue.waitPoll = false; // Unmark the queue as waiting. - - // Reset thread name now that the wait is over. - currentThread.setName(threadName); - - if (listener != null) - listener.waitEnd(queue); - - if (queue.avoidBusyPolling) - { - avoidBusyPolling = true; - queue.avoidBusyPolling = false; - } - } - } - - synchronized (session) - { - if (session.waitMonitor != null) - { - session.waitMonitor.remove(endpointId); - } - } - - if (Log.isDebug()) - { - if (didWait) - reportStatusIfDebug("done waiting for new messages to arrive and is flushing the outbound queue"); - else - reportStatusIfDebug("didn't need to wait and is flushing the outbound queue"); - } - - // We need to hold the FlexClient lock to invoke flush. - FlushResult result; - synchronized (lock) - { - result = internalFlush(queue); - } - if (avoidBusyPolling) - { - PollFlushResult swappedPollResult = new PollFlushResult(); - if (result != null) - { - swappedPollResult.setMessages(result.getMessages()); - swappedPollResult.setNextFlushWaitTimeMillis(result.getNextFlushWaitTimeMillis()); - } - swappedPollResult.setAvoidBusyPolling(true); - result = swappedPollResult; - } - return result; - } - catch (InterruptedException e) - { - if (Log.isWarn()) - Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).warn("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id + - "' could not finish waiting for new messages to arrive " + - "because it was interrupted: " + e.toString()); - } - } - else - { - // The queue was null; let the client know that there are no active subscriptions. - throwNotSubscribedException(endpointId); - } - return null; - } - - private void reportStatusIfDebug(String message) - { - String threadName = Thread.currentThread().getName(); - if (Log.isDebug()) - Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id + "' is " + message); - } - - /** - * - * Poll for outbound messages for a specific MessageClient/Consumer. - * This overload of poll() is only invoked when handling a Consumer.receive() request. - * - * @param client The specific MessageClient instance to poll for messages for. - * @return The flush result including messages to return in the poll response. - * The nextFlushWaitTimeMillis value is always forced to a value of 0 because - * Consumer.receive() calls are driven by client code and this setting has no meaning. - */ - public FlushResult poll(MessageClient client) - { - FlushResult flushResult = null; - String endpointId = client.getEndpointId(); - EndpointQueue queue = null; - synchronized (lock) - { - checkValid(); - - queue = outboundQueues.get(endpointId); - if (queue != null) - { - try - { - flushResult = internalFlush(queue, client); - } - catch (RuntimeException e) - { - if (Log.isError()) - Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to flush an outbound queue for MessageClient '" + client.getClientId() + "' for FlexClient '" + getId() + "'.", e); - throw e; - } - if (flushResult != null) - flushResult.setNextFlushWaitTimeMillis(0); // Force to 0. - } - } - if (queue == null) - { - throwNotSubscribedException(endpointId); - } - return flushResult; - } - - /** - * - * Push a message to the FlexClient. - * The message is added to the outbound queue of messages for the client and - * will be pushed if possible or retrieved via a client poll request. - * - * @param message The Message to push. - * @param messageClient The MessageClient subscription that this message targets. - */ - public void push(Message message, MessageClient messageClient) - { - // We should check the message client is valid or not - if (!isValid()) - return; - - // Route this message to the proper per-endpoint outbound queue. - EndpointQueue queue = outboundQueues.get(messageClient.getEndpointId()); - - // This queue may be null if all corresponding subscriptions have been invalidated. - if (queue == null) - return; - - boolean empty; - - // We need to obtain the lock here - // Maintain the pattern of using the FlexClient.lock and ensure that order of locks should always start with the FlexClient.lock - // This is critical to prevent deadlock cases, see Watson bug 2724938 - synchronized (lock) - { - synchronized (queue) // To protect the list during the add and allow for notification. - { - // Let the processor add the message to the queue. - try - { - queue.processor.add(queue.messages, message); - empty = queue.messages.isEmpty(); - - if (Log.isDebug()) - Log.getLogger(LogCategories.MESSAGE_GENERAL).debug( - "Queuing message: " + message.getMessageId() + - StringUtils.NEWLINE + - " to send to MessageClient: " + messageClient.getClientId() + - StringUtils.NEWLINE + - " for FlexClient: " + messageClient.getFlexClient().getId() + - StringUtils.NEWLINE + - " via endpoint: " + queue.endpointId + - StringUtils.NEWLINE + - " client outbound queue size: " + queue.messages.size()); - } - catch (RuntimeException e) - { - if (Log.isError()) - Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to add a message to an outbound queue for FlexClient '" + getId() + "'.", e); - throw e; - } - // And notify any threads that may be in a poll wait state. - if (!empty && queue.waitPoll) - { - // TODO This updateLastUse call is added here because there used to be a call - // at the beginning of the push method but not convinced that it is needed. - updateLastUse(); - queue.notifyAll(); - } - } - - if (!empty) - { - if (queue.asyncPoll != null) - { - completeAsyncPoll(queue.asyncPoll, internalFlush(queue)); - } - else if (!empty && queue.flushTask == null && - (queue.pushSession != null || (endpointPushHandlers != null && endpointPushHandlers.containsKey(queue.endpointId)))) - { - // If a delayed flush is not scheduled and we have a push-enabled session associated with the queue - // or a push-enabled endpoint, try a direct push to the client. - // Once again we should acquire the lock for queue, otherwise a potential dead lock could happen, see Watson bug 2724936 - // By acquiring the queue lock again, we break the cycle by acquiring the queue before holding FlexClient.lock object - synchronized (queue) - { - directFlush(queue); - } - } - } - } - } - - /** - * - * Registers an <tt>EndpointPushHandler</tt> for the specified endpoint to handle pushing messages - * to remote clients. - * - * @param handler The <tt>EndpointPushHandler</tt> to register. - * @param endpointId The endpoint to register for. - */ - public void registerEndpointPushHandler(EndpointPushHandler handler, String endpointId) - { - synchronized (lock) - { - if (endpointPushHandlers == null) - endpointPushHandlers = new HashMap<String, EndpointPushHandler>(1); - - if (endpointPushHandlers.containsKey(endpointId)) - { - MessageException me = new MessageException(); - me.setMessage(ENDPOINT_PUSH_HANDLER_ALREADY_REGISTERED, new Object[] {getId(), endpointId}); - throw me; - } - - endpointPushHandlers.put(endpointId, handler); - } - } - - /** - * - * Used internally to associate a FlexSession with this FlexClient. - * - * @param session The FlexSession to associate with this FlexClient. - */ - public void registerFlexSession(FlexSession session) - { - if (sessions.addIfAbsent(session)) - { - session.addSessionDestroyedListener(this); - session.registerFlexClient(this); - } - } - - /** - * - * Used internally to associate a MessageClient with this FlexClient. - * - * @param messageClient The MessageClient to associate with this FlexClient. - */ - public void registerMessageClient(MessageClient messageClient) - { - synchronized (lock) - { - if (messageClients == null) - messageClients = new CopyOnWriteArrayList<MessageClient>(); - } - - if (messageClients.addIfAbsent(messageClient)) - { - messageClient.addMessageClientDestroyedListener(this); - String endpointId = messageClient.getEndpointId(); - // Manage the outbound queue this MessageClient's subscription(s) will use - // and associate the MessageClient with an EndpointPushHandler if one exists for the - // endpoint the subscription was made over; this allows the shut-down of a - // push connection to invalidate any subscriptions that are using it. - synchronized (lock) - { - getOrCreateEndpointQueueAndRegisterSubscription(messageClient, endpointId); - if (endpointPushHandlers != null) - { - EndpointPushHandler handler = endpointPushHandlers.get(endpointId); - if (handler != null) - handler.registerMessageClient(messageClient); - } - } - } - } - - /** - * Removes the attribute bound to the specified name for the FlexClient. - * - * @param name The name of the attribute to remove. - */ - public void removeAttribute(String name) - { - Object value; // Used for event dispatch after the attribute is removed. - - synchronized (lock) - { - checkValid(); - - updateLastUse(); - - value = (attributes != null) ? attributes.remove(name) : null; - } - - // If no value was bound under this name it's a no-op. - if (value == null) - return; - - notifyAttributeUnbound(name, value); - notifyAttributeRemoved(name, value); - } - - /** - * Removes a FlexClient attribute listener. - * - * @param listener The listener to remove. - */ - public void removeClientAttributeListener(FlexClientAttributeListener listener) - { - // No need to check validity; removing a listener is always ok. - if (listener != null && attributeListeners != null) - attributeListeners.remove(listener); - } - - /** - * Removes a FlexClient destroyed listener. - * - * @see flex.messaging.client.FlexClientListener - * - * @param listener The listener to remove. - */ - public void removeClientDestroyedListener(FlexClientListener listener) - { - // No need to check validity; removing a listener is always ok. - if (listener != null && destroyedListeners != null) - destroyedListeners.remove(listener); - } - - /** - * Removes the attribute bound to the specified name for all the FlexSessions - * associated with the FlexClient. - * - * @param name The name of the attribute to remove. - */ - public void removeSessionAttribute(String name) - { - for (FlexSession session : sessions) - session.removeAttribute(name); - } - - /** - * - * Implements FlexSessionListener interface. - * Notification that a FlexSession was created. - * This is a no-op because the FlexClient is never added as a static FlexSession created listener - * but this method is required by the interface. We only listen for the destroyed event from - * associated FlexSessions. - * - * @param session The FlexSession that was created. - */ - public void sessionCreated(FlexSession session) {} - - /** - * - * Implements FlexSessionListener interface. - * Notification that an associated FlexSession was destroyed. - * - * @param session The FlexSession that was destroyed. - */ - public void sessionDestroyed(FlexSession session) - { - unregisterFlexSession(session); - } - - /** - * Binds an attribute value for the FlexClient under the specified name. - * - * @param name The name to bind the attribute under. - * @param value The value of the attribute. - */ - public void setAttribute(String name, Object value) - { - // Null value set is the same as removeAttribute(). - if (value == null) - { - removeAttribute(name); - return; - } - - Object oldValue; // Used to determine which events to dispatch after the set is performed. - - // Only synchronize for the attribute mutation; event dispatch doesn't require it. - synchronized (lock) - { - checkValid(); - - updateLastUse(); - - if (attributes == null) - attributes = new HashMap<String, Object>(); - - oldValue = attributes.put(name, value); - } - - if (oldValue == null) - { - notifyAttributeBound(name, value); - notifyAttributeAdded(name, value); - } - else - { - notifyAttributeUnbound(name, oldValue); - notifyAttributeReplaced(name, oldValue); - notifyAttributeBound(name, value); - } - } - - /** - * Binds an attribute value for the current FlexSession associated with the - * FlexClient under the specified name. If the current FlexSession is NIO-based - * (NIOHTTPFlexSession or RTMPFlexSession), and if the FlexClient is associated - * with a Servlet-based session (HttpFlexSession) as well, the attribute is bound - * on the Servlet-based session too to make it available to the underlying J2EE HttpSession. - * - * @param name The name to bind the attribute under. - * @param value The value of the attribute. - */ - public void setSessionAttribute(String name, Object value) - { - setSessionAttributeInCurrentSession(name, value); - if (!isCurrentSessionServletBased()) - setSessionAttributeInServletBasedSession(name, value); - } - - /** - * - * Implements TimeoutCapable. - * Inform the object that it has timed out. - */ - public void timeout() - { - invalidate(); - } - - /** - * - * Unregisters an <tt>EndpointPushHandler</tt> from the specified endpoint. - * - * @param handler The <tt>EndpointPushHandler</tt> to unregister. - * @param endpointId The endpoint to unregister from. - */ - public void unregisterEndpointPushHandler(EndpointPushHandler handler, String endpointId) - { - synchronized (lock) - { - if (endpointPushHandlers == null) - return; // No-op. - - if (endpointPushHandlers.get(endpointId).equals(handler)) - endpointPushHandlers.remove(endpointId); - } - } - - /** - * - * Used internally to disassociate a FlexSession from this FlexClient. - * - * @param session The FlexSession to disassociate from this FlexClient. - */ - public void unregisterFlexSession(FlexSession session) - { - if (sessions.remove(session)) - { - session.removeSessionDestroyedListener(this); - session.unregisterFlexClient(this); - // Once all client sessions/connections terminate; shut down. - if (sessions.isEmpty()) - invalidate(); - } - } - - /** - * - * Used internally to disassociate a MessageClient (subscription) from a FlexClient. - * - * @param messageClient The MessageClient to disassociate from the FlexClient. - */ - public void unregisterMessageClient(MessageClient messageClient) - { - if (messageClients != null && messageClients.remove(messageClient)) - { - messageClient.removeMessageClientDestroyedListener(this); - String endpointId = messageClient.getEndpointId(); - // Manage the outbound queue that this subscription uses. - synchronized (lock) - { - EndpointQueue queue = outboundQueues.get(endpointId); - if (queue != null) - { - // Decrement the ref count of MessageClients using this queue. - queue.messageClientRefCount--; - - // Unregister the message client from the outbound throttle - // manager (if one exists). - OutboundQueueThrottleManager tm = queue.processor.getOutboundQueueThrottleManager(); - if (tm != null) - tm.unregisterAllSubscriptions(messageClient.getDestinationId()); - - // If we're not attempting to notify the remote client that this MessageClient has - // been invalidated, remove any associated messages from the queue. - if (!messageClient.isAttemptingInvalidationClientNotification()) - { - Object messageClientId = messageClient.getClientId(); - for (Iterator<Message> iter = queue.messages.iterator(); iter.hasNext(); ) - { - Message message = iter.next(); - if (message.getClientId().equals(messageClientId)) - iter.remove(); - } - } - - // If no active subscriptions require the queue, clean it up if possible. - if (queue.messageClientRefCount == 0) - { - if (queue.messages.isEmpty() || messageClient.isClientChannelDisconnected()) - { - if (queue.asyncPoll != null) // Close out async long-poll if one is registered. - { - FlushResult flushResult = internalFlush(queue); - // If the MessageClient isn't attempting client notification, override - // and do so in this case to suppress the next poll request from the remote client - // which will fail triggering an unnecessary channel disconnect on the client. - if (!messageClient.isAttemptingInvalidationClientNotification()) - { - CommandMessage msg = new CommandMessage(); - msg.setClientId(messageClient.getClientId()); - msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION); - List<Message> messages = flushResult.getMessages(); - if (messages == null) - messages = new ArrayList<Message>(1); - messages.add(msg); - } - completeAsyncPoll(queue.asyncPoll, flushResult); - } - - // Remove the empty, unused queue. - outboundQueues.remove(endpointId); - } - // Otherwise, the queue is being used by a polling client or contains messages - // that will be written by a delayed flush. - // Leave it in place. Once the next poll request or delayed flush occurs the - // queue will be cleaned up at that point. See internalFlush() and shutdownQueue(). - } - - // Make sure to notify any threads waiting on this queue that may be associated - // with the subscription that's gone away. - synchronized (queue) - { - queue.notifyAll(); - } - } - // And if this subscription was associated with an endpoint push handler, unregister it. - if (endpointPushHandlers != null) - { - EndpointPushHandler handler = endpointPushHandlers.get(endpointId); - if (handler != null) - handler.unregisterMessageClient(messageClient); - } - } - } - } - - //-------------------------------------------------------------------------- - // - // Protected Methods - // - //-------------------------------------------------------------------------- - - /** - * Utility method that tests validity and throws an exception if the instance - * has been invalidated. - */ - protected void checkValid() - { - synchronized (lock) - { - if (!valid) - { - MessageException e = new MessageException(); - e.setMessage(FLEX_CLIENT_INVALIDATED); - throw e; - } - } - } - - /** - * Invoked to clean up a timed out or closed async poll. - * - * @param asyncPoll The async poll to complete. - * @param result The FlushResult for the poll response. - */ - protected void completeAsyncPoll(AsyncPollWithTimeout asyncPoll, FlushResult result) - { - synchronized (lock) - { - asyncPoll.cancelTimeout(); - EndpointQueue queue = asyncPoll.getEndpointQueue(); - if (queue.asyncPoll.equals(asyncPoll)) - queue.asyncPoll = null; - FlexSession session = asyncPoll.getFlexSession(); - synchronized (session) - { - if (session.asyncPollMap != null) - session.asyncPollMap.remove(asyncPoll.getEndpointId()); - } - asyncPoll.getHandler().asyncPollComplete(result); - } - } - - /** - * Invoked to flush queued outbound messages to a client directly using a session - * that supports real-time push. - * Called by push() or delayed flush tasks for push-enabled sessions/connections. - */ - protected void directFlush(EndpointQueue queue) - { - synchronized (lock) - { - // No need to invoke flush if the FlexClient has been invalidated. - if (!valid) - return; - - // If this invocation is a callback from a flush task, null out the task ref on - // the queue to allow a subsequent delayed flush to be scheduled. - if (queue.flushTask != null) - queue.flushTask = null; - - FlushResult flushResult = internalFlush(queue, null, false /* updateLastUse */); - if (flushResult == null) // If there's no flush result, return. - return; - - // Pass any messages that are ready to flush off to the network layer. - List<Message> messages = flushResult.getMessages(); - if (messages != null && !messages.isEmpty()) - { - if (queue.pushSession != null) - { - if (queue.pushSession instanceof ConnectionAwareSession) - { - // Update last use only if we're actually writing back to the client. - if ( ((ConnectionAwareSession)queue.pushSession).isConnected() ) - updateLastUse(); - } - for (Message msg : messages) - queue.pushSession.push(msg); - } - else if (endpointPushHandlers != null) - { - updateLastUse(); - EndpointPushHandler handler = endpointPushHandlers.get(queue.endpointId); - handler.pushMessages(messages); - } - } - - // Schedule a delayed flush if necessary. - int flushWaitTime = flushResult.getNextFlushWaitTimeMillis(); - if (flushWaitTime > 0) // Set up and schedule the delayed flush task. - queue.flushTask = new FlexClientScheduledFlushForPush(queue, flushWaitTime); - } - } - - /** - * Utility method to initialize an EndpointQueue (if necessary) and associate a subscription (MessageClient) with it. - */ - protected EndpointQueue getOrCreateEndpointQueueAndRegisterSubscription(MessageClient messageClient, String endpointId) - { - EndpointQueue newQueue; - if (!outboundQueues.containsKey(endpointId)) - { - newQueue = new EndpointQueue(); - newQueue.flexClient = this; - newQueue.endpointId = endpointId; - newQueue.endpoint = flexClientManager.getMessageBroker().getEndpoint(endpointId); - newQueue.messages = new ArrayList<Message>(); /* Default size of 10 is fine */ - FlexSession session = messageClient.getFlexSession(); - if (session.isPushSupported()) - newQueue.pushSession = session; - newQueue.processor = flexClientManager.createOutboundQueueProcessor(this, endpointId); - newQueue.messageClientRefCount = 1; - - outboundQueues.put(endpointId, newQueue); - } - else - { - newQueue = outboundQueues.get(endpointId); - newQueue.messageClientRefCount++; - // Resubscribes as a result of network connectivity issues may arrive over the same - // endpoint but use a new session. - FlexSession session = messageClient.getFlexSession(); - if (session.isPushSupported()) - newQueue.pushSession = session; - } - return newQueue; - } - - /** - * Utility method to flush the outbound queue and log any problems. - * Any exceptions are logged and then rethrown. - * - * @param queue The outbound queue to flush. - */ - protected FlushResult internalFlush(EndpointQueue queue) - { - return internalFlush(queue, null); - } - - /** - * Utility method to flush the outbound queue and log any problems. - * If a specific client is passed, we need to invoke a client-specific flush. - * If the passed client is null, we do a general flush of the queue. - * Any exceptions are logged and then rethrown. - * - * @param queue The outbound queue to flush. - * @param client The client to flush for. - */ - protected FlushResult internalFlush(EndpointQueue queue, MessageClient client) - { - return internalFlush(queue, null, true); - } - - /** - * Utility method to flush the outbound queue and log any problems. - * If a specific client is passed, we need to invoke a client-specific flush. - * If the passed client is null, we do a general flush of the queue. - * Any exceptions are logged and then rethrown. - * - * @param queue The outbound queue to flush. - * @param client The client to flush for. - * @param updateLastUse Whether the last-use timestamp of the FlexClient should - * be updated. - */ - protected FlushResult internalFlush(EndpointQueue queue, MessageClient client, - boolean updateLastUse) - { - FlushResult flushResult; - try - { - synchronized (queue) - { - flushResult = queue.processor.flush(client, queue.messages); - shutdownQueue(queue); - } - - if (updateLastUse) - updateLastUseIfNecessary(flushResult); - } - catch (RuntimeException e) - { - if (Log.isError()) - Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to flush an outbound queue for FlexClient '" + getId() + "'.", e); - throw e; - } - return flushResult; - } - - /** - * Utility method to flush messages in response to a poll request with - * regular and wait poll. - * - * @param queue The endpoint queue to flush messages for. - * @return The flush result with messages, or null if there are no messages. - */ - protected FlushResult internalPoll(EndpointQueue queue) - { - List<Message> allMessages = new ArrayList<Message>(); - - // First, add the previously flushed messages. - if (queue.flushedMessagesBetweenPolls != null && queue.flushedMessagesBetweenPolls.size() > 0) - { - allMessages.addAll(queue.flushedMessagesBetweenPolls); - queue.flushedMessagesBetweenPolls.clear(); - } - - // Then, check for regularly queued messages. We call internalFlush - // even if the queue is empty so the queue processor could know - // about the incoming poll request regardless. - FlushResult internalFlushResult = internalFlush(queue); - List<Message> flushedMessages = internalFlushResult.getMessages(); - if (flushedMessages != null && !flushedMessages.isEmpty()) - allMessages.addAll(flushedMessages); - - // Schedule a delayed flush, if necessary. - int flushWaitTime = internalFlushResult.getNextFlushWaitTimeMillis(); - if (flushWaitTime > 0) - queue.flushTask = new FlexClientScheduledFlushForPoll(queue, flushWaitTime); - - if (allMessages.size() > 0) // Flush, if there are messages. - { - FlushResult flushResult = new FlushResult(); - flushResult.setMessages(allMessages); - return flushResult; - } - return null; - } - - /** - * Notify attribute listeners that an attribute has been added. - * - * @param name The name of the attribute. - * - * @param value The new value of the attribute. - */ - protected void notifyAttributeAdded(String name, Object value) - { - if (attributeListeners != null && !attributeListeners.isEmpty()) - { - FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value); - // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. - for (FlexClientAttributeListener attribListener : attributeListeners) - attribListener.attributeAdded(event); - } - } - - /** - * Notify binding listener that it has been bound to the FlexClient. - * - * @param name The attribute name. - * - * @param value The attribute that has been bound. - */ - protected void notifyAttributeBound(String name, Object value) - { - if ((value != null) && (value instanceof FlexClientBindingListener)) - { - FlexClientBindingEvent bindingEvent = new FlexClientBindingEvent(this, name); - ((FlexClientBindingListener)value).valueBound(bindingEvent); - } - } - - /** - * Notify attribute listeners that an attribute has been removed. - * - * @param name The name of the attribute. - * - * @param value The previous value of the attribute. - */ - protected void notifyAttributeRemoved(String name, Object value) - { - if (attributeListeners != null && !attributeListeners.isEmpty()) - { - FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value); - // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. - for (FlexClientAttributeListener attribListener : attributeListeners) - attribListener.attributeRemoved(event); - } - } - - /** - * Notify attribute listeners that an attribute has been replaced. - * - * @param name The name of the attribute. - * - * @param value The previous value of the attribute. - */ - protected void notifyAttributeReplaced(String name, Object value) - { - if (attributeListeners != null && !attributeListeners.isEmpty()) - { - FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value); - // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. - for (FlexClientAttributeListener attribListener : attributeListeners) - attribListener.attributeReplaced(event); - } - } - - /** - * Notify binding listener that it has been unbound from the FlexClient. - * - * @param name The attribute name. - * - * @param value The attribute that has been unbound. - */ - protected void notifyAttributeUnbound(String name, Object value) - { - if ((value != null) && (value instanceof FlexClientBindingListener)) - { - FlexClientBindingEvent bindingEvent = new FlexClientBindingEvent(this, name); - ((FlexClientBindingListener)value).valueUnbound(bindingEvent); - } - } - - /** - * Invoked by FlexClientManager after this new FlexClient has been constructed and - * is fully configured. - */ - protected void notifyCreated() - { - if (!createdListeners.isEmpty()) - { - // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions. - for (FlexClientListener createListener : createdListeners) - createListener.clientCreated(this); - } - } - - /** - * Utility method used to shutdown endpoint queues accessed via polling channels - * that have no more active subscriptions and no more pending outbound messages. - * - * @param queue The queue to potentially shutdown. - * @return true if the queue was cleaned up/removed; otherwise false. - */ - protected boolean shutdownQueue(EndpointQueue queue) - { - // If no more subscriptions are using the queue and it is empty, shut it down. - if (queue.messageClientRefCount == 0 && queue.messages.isEmpty()) - { - outboundQueues.remove(queue.endpointId); - // Notify any threads waiting on this queue. - synchronized (queue) - { - queue.notifyAll(); - } - return true; - } - return false; - } - - /** - * Utility method to throw a not subscribed exception back to the client - * if they issue a poll request to an endpoint that they haven't subscribed over. - * - * This method should not be called when you hold an internal thread lock. It iterates - * over all the FlexClients in the current session and will not work if two or more - * FlexClients in the same session call it simultaneously. - * - * @param endpointId The endpoint Id. - */ - protected void throwNotSubscribedException(String endpointId) - { - // Pre-3.1 versions of the client library did not handle URL session tokens properly - // and may incorrectly issue a poll, after subscribing, that does not contain the proper - // FlexClient id. - // This scenario looks like a poll from a client that is not subscribed, but it is not, - // and deserves a more useful error message. - // We determine this by checking for an (orphaned) FlexClient instance associated with the - // current session that has a subscription established through the target endpoint. - List<FlexClient> flexClients = FlexContext.getFlexSession().getFlexClients(); - for (FlexClient otherClient : flexClients) - { - if (!otherClient.equals(this)) - { - List<MessageClient> otherSubs = otherClient.getMessageClients(); - for (MessageClient otherSub : otherSubs) - { - if (otherSub.getEndpointId().equals(endpointId)) - { - // Throw not-subscribed exception with extra guidance. - FlexClientNotSubscribedException e = new FlexClientNotSubscribedException(); - e.setMessage(10036, new Object[]{endpointId}); - e.setCode(MessageService.NOT_SUBSCRIBED_CODE); - throw e; - } - } - } - } - - // Throw general not-subscribed exception. - FlexClientNotSubscribedException e = new FlexClientNotSubscribedException(); - e.setMessage(10028, new Object[]{endpointId}); - e.setCode(MessageService.NOT_SUBSCRIBED_CODE); - throw e; - } - - /** - * Updates the last-use timestamp if there are messages in the flush result. - * - * @param flushResult The flush result. - */ - protected void updateLastUseIfNecessary(FlushResult flushResult) - { - List<Message> messages = flushResult != null ? flushResult.getMessages() : null; - if (messages != null && !messages.isEmpty()) - updateLastUse(); - } - - //-------------------------------------------------------------------------- - // - // Private Methods - // - //-------------------------------------------------------------------------- - - private Set<String> getSessionAttributeNames(FlexSession session) - { - Set<String> attributeNames = new HashSet<String>(); - Enumeration<String> currentAttributeNames = session.getAttributeNames(); - while (currentAttributeNames.hasMoreElements()) - attributeNames.add(currentAttributeNames.nextElement()); - return attributeNames; - } - - private Object getSessionAttributeInCurrentSession(String name) - { - return FlexContext.getFlexSession().getAttribute(name); - } - - private Object getSessionAttributeInOtherSessions(String name) - { - FlexSession currentSession = FlexContext.getFlexSession(); - for (FlexSession session : sessions) - { - if (session == currentSession) - continue; - - Object attributeValue = session.getAttribute(name); - if (attributeValue != null) - return attributeValue; - } - return null; - } - - private void setSessionAttributeInCurrentSession(String name, Object value) - { - FlexContext.getFlexSession().setAttribute(name, value); - } - - private void setSessionAttributeInServletBasedSession(String name, Object value) - { - for (FlexSession session : sessions) - { - if (isServletBasedSession(session)) - { - session.setAttribute(name, value); - return; - } - } - } - - private boolean isCurrentSessionServletBased() - { - return isServletBasedSession(FlexContext.getFlexSession()); - } - - private boolean isServletBasedSession(FlexSession session) - { - return session instanceof HttpFlexSession; - } - - //-------------------------------------------------------------------------- - // - // Inner Classes - // - //-------------------------------------------------------------------------- - - /** - * Helper class for handling async poll requests. This class allows the response for an async poll - * to be delayed until data arrives to return to the client or the specified wait interval elapses. - * Wait timeouts are monitored by the <tt>FlexClientManager</tt> which contains a <tt>TimeoutManager</tt> - * instance that is started and stopped during application bootstrap and shutdown. Managing timeouts - * locally or statically isn't a good option because they lack a useful shutdown hook that's necessary - * in order to close down the timeout manager cleanly. - */ - public class AsyncPollWithTimeout extends TimeoutAbstractObject - { - /** - * Constructor. - * - * @param flexClient flex client - * @param session flex session - * @param queue endpoint queue - * @param handler poll handler - * @param waitIntervalMillis wait interval - * @param endpointId endpoint - */ - public AsyncPollWithTimeout(FlexClient flexClient, FlexSession session, EndpointQueue queue, AsyncPollHandler handler, long waitIntervalMillis, String endpointId) - { - this.flexClient = flexClient; - this.session = session; - this.queue = queue; - this.handler = handler; - setTimeoutPeriod(waitIntervalMillis); - flexClientManager.monitorAsyncPollTimeout(this); - this.endpointId = endpointId; - } - - private final FlexClient flexClient; - - /** - * Return client. - * - * @return flex client - */ - public FlexClient getFlexClient() - { - return flexClient; - } - - private final FlexSession session; - - /** - * Return session. - * @return flex session - */ - public FlexSession getFlexSession() - { - return session; - } - - private final EndpointQueue queue; - - /** - * Return endpoint queue. - * @return the queue - */ - public EndpointQueue getEndpointQueue() - { - return queue; - } - - private final AsyncPollHandler handler; - - /** - * Return handler. - * @return the handler - */ - public AsyncPollHandler getHandler() - { - return handler; - } - - private final String endpointId; - - /** - * Return endpoint ID. - * @return the id - */ - public String getEndpointId() - { - return endpointId; - } - - /** - * Trigger a timeout. - */ - public void timeout() - { - completeAsyncPoll(this, null /* nothing to return */); - } - } - - /** - * Helper class to flush a FlexClient's outbound queue after a specified delay. - * Delayed flushes are handled by the <tt>FlexClientManager</tt> - * using <tt>TimeoutManager</tt>. - */ - abstract class FlexClientScheduledFlush extends TimeoutAbstractObject - { - final EndpointQueue queue; - - public FlexClientScheduledFlush(EndpointQueue queue, long waitIntervalMillis) - { - this.queue = queue; - setTimeoutPeriod(waitIntervalMillis); - flexClientManager.monitorScheduledFlush(this); - } - - abstract void performFlushTask(); - - public void timeout() - { - FlexContext.setThreadLocalFlexClient(FlexClient.this); - performFlushTask(); - FlexContext.setThreadLocalFlexClient(null); - } - } - - /** - * Helper class for push channels to directly flush a FlexClient's outbound - * queue after a specified delay. - */ - class FlexClientScheduledFlushForPush extends FlexClientScheduledFlush - { - public FlexClientScheduledFlushForPush(EndpointQueue queue, long waitIntervalMillis) - { - super(queue, waitIntervalMillis); - } - - @Override - void performFlushTask() - { - synchronized (lock) - { - synchronized (queue) - { - directFlush(queue); - } - } - } - - } - - /** - * Helper class for polling channels to flush a FlexClient's outbound - * queue to flushedMessagesBetweenPolls queue after a specified delay. - * When the next poll happens, the flushedMessagesBetweenPolls will be - * drained first. - */ - class FlexClientScheduledFlushForPoll extends FlexClientScheduledFlush - { - public FlexClientScheduledFlushForPoll(EndpointQueue queue, long waitIntervalMillis) - { - super(queue, waitIntervalMillis); - } - - @Override - void performFlushTask() - { - synchronized (lock) - { - // No need to invoke flush if the FlexClient has been invalidated. - if (!valid) - return; - - // If this invocation is a callback from a flush task, null out the task ref on - // the queue to allow a subsequent delayed flush to be scheduled. - if (queue.flushTask != null) - queue.flushTask = null; - - FlushResult flushResult = internalFlush(queue); - if (flushResult == null) - return; - - List<Message> messages = flushResult.getMessages(); - if (messages != null && messages.size() > 0) - { - if (queue.asyncPoll != null) - { - completeAsyncPoll(queue.asyncPoll, flushResult); - } - else - { - if (queue.flushedMessagesBetweenPolls == null) - queue.flushedMessagesBetweenPolls = new ArrayList<Message>(); - queue.flushedMessagesBetweenPolls.addAll(messages); - } - } - - // Schedule a delayed flush, if necessary. - int flushWaitTime = flushResult.getNextFlushWaitTimeMillis(); - if (flushWaitTime > 0) - queue.flushTask = new FlexClientScheduledFlushForPoll(queue, flushWaitTime); - } - } - } - - /** - * - * Helper class that stores per-endpoint outbound queue state including: - * <ul> - * <li>flexClient - The <tt>FlexClient</tt> the queue is used by.</li> - * <li>messages - The outbound queue of messages for the endpoint.</li> - * <li>flushedMessagesBetweenPolls - Keeps track of flushed (more precisely - * drained buffered) messages between polls. A seperate list is needed - * from messages list to avoid regular flush handling.</li> - * <li>flushedMessagesBetweenPolls - Keeps track of flushed messages between polls.</li> - * <li>processor - The processor that handles adding messages to the queue as well as flushing - * them to the network.</li> - * <li>asyncPoll - The async poll to timeout or callback when messages arrive - * (null if the endpoint or session supports direct push).</li> - * <li>pushSession - A reference to a pushSession to use for direct writes to the - * client (null if the endpoint uses polling or handles push directly).</li> - * - * <li>flushTask - A reference to a pending flush task that will perform a delayed flush of the queue; - * null if no delayed flush has been scheduled.</li> - * <li>messageClientRefCount - A reference count of MessageClients subcribed over this endpoint. - * Once all MessageClients unsubscribe this queue can be shut down.</li> - * <li>avoidBusyPolling - Used to signal poll result generation for the queue to avoid busy polling.</li> - * </ul> - */ - public static class EndpointQueue - { - public FlexClient flexClient; - public String endpointId; - public Endpoint endpoint; - public List<Message> messages; - public List<Message> flushedMessagesBetweenPolls; - public FlexClientOutboundQueueProcessor processor; - public AsyncPollWithTimeout asyncPoll; - public boolean waitPoll; - public FlexSession pushSession; - public TimeoutAbstractObject flushTask; - public int messageClientRefCount; - public boolean avoidBusyPolling; - } -}
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/client/FlexClientAttributeListener.java ---------------------------------------------------------------------- diff --git a/core/src/flex/messaging/client/FlexClientAttributeListener.java b/core/src/flex/messaging/client/FlexClientAttributeListener.java deleted file mode 100644 index 2d83d0b..0000000 --- a/core/src/flex/messaging/client/FlexClientAttributeListener.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package flex.messaging.client; - -/** - * Interface for FlexClient attribute listeners. - */ -public interface FlexClientAttributeListener -{ - /** - * Callback invoked after an attribute is added to the FlexClient. - * - * @param event The event containing the associated FlexClient and attribute - * information. - */ - void attributeAdded(FlexClientBindingEvent event); - - /** - * Callback invoked after an attribute is removed from the FlexClient. - * - * @param event The event containing the associated FlexClient and attribute - * information. - */ - void attributeReplaced(FlexClientBindingEvent event); - - /** - * Callback invoked after an attribute has been replaced with a new value. - * - * @param event The event containing the associated FlexClient and attribute - * information. - */ - void attributeRemoved(FlexClientBindingEvent event); -}