http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java new file mode 100644 index 0000000..3e932cf --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java @@ -0,0 +1,402 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.query.internal.cq; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.StatisticDescriptor; +import com.gemstone.gemfire.Statistics; +import com.gemstone.gemfire.StatisticsFactory; +import com.gemstone.gemfire.StatisticsType; +import com.gemstone.gemfire.StatisticsTypeFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.internal.NanoTimer; +import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl; +import com.gemstone.gemfire.internal.cache.FilterProfile; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.logging.LogService; + +/** + * This class tracks GemFire statistics related to CqService. + * Specifically the following statistics are tracked: + * Number of CQs created + * Number of active CQs + * Number of CQs suspended or stopped + * Number of CQs closed + * Number of CQs on a client + * + * @author Rao Madduri + * @since 5.5 + */ +public class CqServiceVsdStats +{ + private static final Logger logger = LogService.getLogger(); + + /** The <code>StatisticsType</code> of the statistics */ + private static final StatisticsType _type; + + /** Name of the created CQs statistic */ + protected static final String CQS_CREATED = "numCqsCreated"; + + /** Name of the active CQs statistic */ + protected static final String CQS_ACTIVE = "numCqsActive"; + + /** Name of the stopped CQs statistic */ + protected static final String CQS_STOPPED = "numCqsStopped"; + + /** Name of the closed CQs statistic */ + protected static final String CQS_CLOSED = "numCqsClosed"; + + /** Name of the client's CQs statistic */ + protected static final String CQS_ON_CLIENT = "numCqsOnClient"; + + /** Number of clients with CQs statistic */ + protected static final String CLIENTS_WITH_CQS = "numClientsWithCqs"; + + + /** CQ query execution time. */ + protected static final String CQ_QUERY_EXECUTION_TIME = "cqQueryExecutionTime"; + + /** CQ query execution in progress */ + protected static final String CQ_QUERY_EXECUTION_IN_PROGRESS = "cqQueryExecutionInProgress"; + + /** Completed CQ query executions */ + protected static final String CQ_QUERY_EXECUTIONS_COMPLETED = "cqQueryExecutionsCompleted"; + + /** Unique CQs, number of different CQ queries */ + protected static final String UNIQUE_CQ_QUERY = "numUniqueCqQuery"; + + /** Id of the CQs created statistic */ + private static final int _numCqsCreatedId; + + /** Id of the active CQs statistic */ + private static final int _numCqsActiveId; + + /** Id of the stopped CQs statistic */ + private static final int _numCqsStoppedId; + + /** Id of the closed CQs statistic */ + private static final int _numCqsClosedId; + + /** Id of the CQs on client statistic */ + private static final int _numCqsOnClientId; + + /** Id of the Clients with Cqs statistic */ + private static final int _numClientsWithCqsId; + + /** Id for the CQ query execution time. */ + private static final int _cqQueryExecutionTimeId; + + /** Id for the CQ query execution in progress */ + private static final int _cqQueryExecutionInProgressId; + + /** Id for completed CQ query executions */ + private static final int _cqQueryExecutionsCompletedId; + + /** Id for unique CQs, difference in CQ queries */ + private static final int _numUniqueCqQuery; + + /** + * Static initializer to create and initialize the <code>StatisticsType</code> + */ + static { + String statName = "CqServiceStats"; + StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton(); + + _type = f.createType(statName, statName, new StatisticDescriptor[] { + f.createLongCounter(CQS_CREATED, "Number of CQs created.", "operations"), + f.createLongCounter(CQS_ACTIVE, "Number of CQS actively executing.", "operations"), + f.createLongCounter(CQS_STOPPED, "Number of CQs stopped.", "operations"), + f.createLongCounter(CQS_CLOSED, "Number of CQs closed.", "operations"), + f.createLongCounter(CQS_ON_CLIENT, "Number of CQs on the client.", "operations"), + f.createLongCounter(CLIENTS_WITH_CQS, "Number of Clients with CQs.", "operations"), + f.createLongCounter(CQ_QUERY_EXECUTION_TIME, "Time taken for CQ Query Execution.", "nanoseconds"), + f.createLongCounter(CQ_QUERY_EXECUTIONS_COMPLETED, "Number of CQ Query Executions.", "operations"), + f.createIntGauge(CQ_QUERY_EXECUTION_IN_PROGRESS, "CQ Query Execution In Progress.", "operations"), + f.createIntGauge(UNIQUE_CQ_QUERY, "Number of Unique CQ Querys.", "Queries"), + + }); + + // Initialize id fields + _numCqsCreatedId = _type.nameToId(CQS_CREATED); + _numCqsActiveId = _type.nameToId(CQS_ACTIVE); + _numCqsStoppedId = _type.nameToId(CQS_STOPPED); + _numCqsClosedId = _type.nameToId(CQS_CLOSED); + _numCqsOnClientId = _type.nameToId(CQS_ON_CLIENT); + _numClientsWithCqsId = _type.nameToId(CLIENTS_WITH_CQS); + _cqQueryExecutionTimeId = _type.nameToId(CQ_QUERY_EXECUTION_TIME); + _cqQueryExecutionsCompletedId = _type.nameToId(CQ_QUERY_EXECUTIONS_COMPLETED); + _cqQueryExecutionInProgressId = _type.nameToId(CQ_QUERY_EXECUTION_IN_PROGRESS); + _numUniqueCqQuery = _type.nameToId(UNIQUE_CQ_QUERY); + + } + + /** The <code>Statistics</code> instance to which most behavior is delegated */ + private final Statistics _stats; + + /** + * Constructor. + * + * @param factory + * The <code>StatisticsFactory</code> which creates the + * <code>Statistics</code> instance + */ + public CqServiceVsdStats(StatisticsFactory factory) { + this._stats = factory.createAtomicStatistics(_type, "CqServiceStats"); + } + + // /////////////////// Instance Methods ///////////////////// + + /** + * Closes the <code>HARegionQueueStats</code>. + */ + public void close() + { + this._stats.close(); + } + + /** + * Returns the current value of the "numCqsCreated" stat. + * + * @return the current value of the "numCqsCreated" stat + */ + public long getNumCqsCreated() + { + return this._stats.getLong(_numCqsCreatedId); + } + + /** + * Increments the "numCqsCreated" stat by 1. + */ + public void incCqsCreated() + { + this._stats.incLong(_numCqsCreatedId, 1); + } + + /** + * Returns the current value of the "numCqsActive" stat. + * + * @return the current value of the "numCqsActive" stat + */ + public long getNumCqsActive() + { + return this._stats.getLong(_numCqsActiveId); + } + + /** + * Increments the "numCqsActive" stat by 1. + */ + public void incCqsActive() + { + this._stats.incLong(_numCqsActiveId, 1); + } + + /** + * Decrements the "numCqsActive" stat by 1. + */ + public void decCqsActive() + { + this._stats.incLong(_numCqsActiveId, -1); + } + + /** + * Returns the current value of the "numCqsStopped" stat. + * + * @return the current value of the "numCqsStopped" stat + */ + public long getNumCqsStopped() + { + return this._stats.getLong(_numCqsStoppedId); + } + + /** + * Increments the "numCqsStopped" stat by 1. + */ + public void incCqsStopped() + { + this._stats.incLong(_numCqsStoppedId, 1); + } + + /** + * Decrements the "numCqsStopped" stat by 1. + */ + public void decCqsStopped() + { + this._stats.incLong(_numCqsStoppedId, -1); + } + + /** + * Returns the current value of the "numCqsClosed" stat. + * + * @return the current value of the "numCqsClosed" stat + */ + public long getNumCqsClosed() + { + return this._stats.getLong(_numCqsClosedId); + } + + /** + * Increments the "numCqsClosed" stat by 1. + */ + public void incCqsClosed() + { + this._stats.incLong(_numCqsClosedId, 1); + } + + /** + * Returns the current value of the "numCqsOnClient" stat. + * + * @return the current value of the "numCqsOnClient" stat + */ + public long getNumCqsOnClient() + { + return this._stats.getLong(_numCqsOnClientId); + } + + /** + * Increments the "numCqsOnClient" stat by 1. + */ + public void incCqsOnClient() + { + this._stats.incLong(_numCqsOnClientId, 1); + } + + /** + * Decrements the "numCqsOnClient" stat by 1. + */ + public void decCqsOnClient() + { + this._stats.incLong(_numCqsOnClientId, -1); + } + + /** + * Returns the current value of the "numClientsWithCqs" stat. + * + * @return the current value of the "numClientsWithCqs" stat + */ + public long getNumClientsWithCqs() + { + return this._stats.getLong(_numClientsWithCqsId); + } + + /** + * Increments the "numClientsWithCqs" stat by 1. + */ + public void incClientsWithCqs() + { + this._stats.incLong(_numClientsWithCqsId, 1); + } + + /** + * Decrements the "numCqsOnClient" stat by 1. + */ + public void decClientsWithCqs() + { + this._stats.incLong(_numClientsWithCqsId, -1); + } + + /** + * Start the CQ Query Execution time. + */ + public long startCqQueryExecution() { + this._stats.incInt(_cqQueryExecutionInProgressId, 1); + return NanoTimer.getTime(); + } + + /** + * End CQ Query Execution Time. + * @param start long time value. + */ + public void endCqQueryExecution(long start) { + long ts = NanoTimer.getTime(); + this._stats.incLong(_cqQueryExecutionTimeId, ts-start); + this._stats.incInt(_cqQueryExecutionInProgressId, -1); + this._stats.incLong(_cqQueryExecutionsCompletedId, 1); + } + + /** + * Returns the total time spent executing the CQ Queries. + * @return long time spent. + */ + public long getCqQueryExecutionTime(){ + return this._stats.getLong(_cqQueryExecutionTimeId); + } + + /** + * Increments number of Unique queries. + */ + public void incUniqueCqQuery() + { + this._stats.incInt(_numUniqueCqQuery, 1); + } + + /** + * Decrements number of unique Queries. + */ + public void decUniqueCqQuery() + { + this._stats.incInt(_numUniqueCqQuery, -1); + } + + + /** + * This is a test method. It silently ignores exceptions and should not be + * used outside of unit tests.<p> + * Returns the number of CQs (active + suspended) on the given region. + * @param regionName + */ + public long numCqsOnRegion(String regionName){ + GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + if (cache == null) { + return 0; + } + DefaultQueryService queryService = (DefaultQueryService)cache.getQueryService(); + CqService cqService = null; + try { + cqService = queryService.getCqService(); + } + catch (CqException e) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to get CqService {}", e.getLocalizedMessage()); + } + e.printStackTrace(); + return -1; // We're confused + } + if (((CqServiceImpl) cqService).isServer()) { + //If we are on the server, look at the number of CQs in the filter profile. + try { + FilterProfile fp = cache.getFilterProfile(regionName); + if (fp == null) { + return 0; + } + return fp.getCqCount(); + } + catch (Exception ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to get serverside CQ count for region: {} {}", regionName, ex.getLocalizedMessage()); + } + } + } + else { + try { + CqQuery[] cqs = queryService.getCqs(regionName); + + if (cqs != null) { + return cqs.length; + } + } catch(Exception ex) { + // Dont do anything. + } + } + return 0; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java new file mode 100644 index 0000000..492a563 --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java @@ -0,0 +1,67 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + *======================================================================== + */ + +package com.gemstone.gemfire.cache.query.internal.cq; + +import com.gemstone.gemfire.cache.query.CqStatistics; + +/** + * Provides statistical information about a CqQuery. + * + * @since 5.5 + * @author Rao Madduri + */ +public class CqStatisticsImpl implements CqStatistics { + private CqQueryImpl cqQuery; + +// private long numInserts; +// private long numDeletes; +// private long numUpdates; +// private long numEvents; + + /** + * Constructor for CqStatisticsImpl + * @param cq - CqQuery reference to the CqQueryImpl object + */ + public CqStatisticsImpl(CqQueryImpl cq) { + cqQuery = cq; + } + + /** + * Returns the number of Insert events for this CQ. + * @return the number of insert events + */ + public long numInserts() { + return this.cqQuery.getVsdStats().getNumInserts(); + } + + /** + * Returns number of Delete events for this CQ. + * @return the number of delete events + */ + public long numDeletes() { + return this.cqQuery.getVsdStats().getNumDeletes(); + } + + /** + * Returns number of Update events for this CQ. + * @return the number of update events + */ + public long numUpdates(){ + return this.cqQuery.getVsdStats().getNumUpdates(); + } + + /** + * Returns the total number of events for this CQ. + * @return the total number of insert, update, and delete events + */ + public long numEvents(){ + return cqQuery.getVsdStats().getNumEvents(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java new file mode 100644 index 0000000..385c0a5 --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java @@ -0,0 +1,639 @@ +package com.gemstone.gemfire.cache.query.internal.cq; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.DataSerializable; +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.SystemFailure; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.client.internal.UserAttributes; +import com.gemstone.gemfire.cache.query.CqAttributes; +import com.gemstone.gemfire.cache.query.CqAttributesMutator; +import com.gemstone.gemfire.cache.query.CqClosedException; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqExistsException; +import com.gemstone.gemfire.cache.query.CqListener; +import com.gemstone.gemfire.cache.query.CqResults; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.QueryException; +import com.gemstone.gemfire.cache.query.RegionNotFoundException; +import com.gemstone.gemfire.cache.query.internal.CompiledBindArgument; +import com.gemstone.gemfire.cache.query.internal.CompiledIteratorDef; +import com.gemstone.gemfire.cache.query.internal.CompiledRegion; +import com.gemstone.gemfire.cache.query.internal.CompiledSelect; +import com.gemstone.gemfire.cache.query.internal.CqStateImpl; +import com.gemstone.gemfire.cache.query.internal.DefaultQuery; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.Token; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.i18n.StringId; + +public class ServerCQImpl extends CqQueryImpl implements DataSerializable, ServerCQ { + private static final Logger logger = LogService.getLogger(); + + /** + * This holds the keys that are part of the CQ query results. + * Using this CQ engine can determine whether to execute + * query on old value from EntryEvent, which is an expensive + * operation. + */ + private volatile HashMap<Object, Object> cqResultKeys; + + /** + * This maintains the keys that are destroyed while the Results + * Cache is getting constructed. This avoids any keys that are + * destroyed (after query execution) but is still part of the + * CQs result. + */ + private HashSet<Object> destroysWhileCqResultsInProgress; + + /** + * To indicate if the CQ results key cache is initialized. + */ + public volatile boolean cqResultKeysInitialized = false; + + /** Boolean flag to see if the CQ is on Partitioned Region */ + public volatile boolean isPR = false; + + private ClientProxyMembershipID clientProxyId = null; + + private CacheClientNotifier ccn = null; + + private String serverCqName; + + + /** identifier assigned to this query for FilterRoutingInfos */ + private Long filterID; + + public ServerCQImpl(CqServiceImpl cqService, String cqName, String queryString, boolean isDurable, String serverCqName) { + super(cqService, cqName, queryString, isDurable); + this.serverCqName = serverCqName; // On Client Side serverCqName and cqName will be same. + } + + public ServerCQImpl() { + //For deserialization + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#getFilterID() + */ + @Override + public Long getFilterID() { + return this.filterID; + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#setFilterID(java.lang.Long) + */ + @Override + public void setFilterID(Long filterID) { + this.filterID = filterID; + } + + @Override + public void setName(String cqName) { + this.cqName = this.serverCqName = cqName; + } + + @Override + public String getServerCqName() { + return this.serverCqName; + } + + @Override + public void registerCq(ClientProxyMembershipID p_clientProxyId, + CacheClientNotifier p_ccn, int p_cqState) + throws CqException, RegionNotFoundException { + + CacheClientProxy clientProxy = null; + this.clientProxyId = p_clientProxyId; + //servConnection = serverSideConnection; + + if (p_ccn != null) { + this.ccn = p_ccn; + clientProxy = p_ccn.getClientProxy(p_clientProxyId, true); + } + + /* + try { + initCq(); + } catch (CqExistsException cqe) { + // Should not happen. + throw new CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_CREATE_CQ_0_ERROR__1.toLocalizedString(new Object[] { cqName, cqe.getMessage()})); + } + */ + + validateCq(); + + final boolean isDebugEnabled = logger.isDebugEnabled(); + StringId msg = LocalizedStrings.ONE_ARG; + Throwable t = null; + try { + this.query = constructServerSideQuery(); + if (isDebugEnabled) { + logger.debug("Server side query for the cq: {} is: {}", cqName, this.query.getQueryString()); + } + } catch (Exception ex) { + t = ex; + if (ex instanceof ClassNotFoundException) { + msg = LocalizedStrings.CqQueryImpl_CLASS_NOT_FOUND_EXCEPTION_THE_ANTLRJAR_OR_THE_SPCIFIED_CLASS_MAY_BE_MISSING_FROM_SERVER_SIDE_CLASSPATH_ERROR_0; + } else { + msg = LocalizedStrings.CqQueryImpl_ERROR_WHILE_PARSING_THE_QUERY_ERROR_0; + } + } finally { + if (t != null) { + String s = msg.toLocalizedString(t); + if (isDebugEnabled) { + logger.debug(s, t); + } + throw new CqException(s); + } + } + + // Update Regions Book keeping. + // TODO replace getRegion() with getRegionByPathForProcessing() so this doesn't block + // if the region is still being initialized + this.cqBaseRegion = (LocalRegion)cqService.getCache().getRegion(regionName); + if (this.cqBaseRegion == null) { + throw new RegionNotFoundException(LocalizedStrings.CqQueryImpl_REGION__0_SPECIFIED_WITH_CQ_NOT_FOUND_CQNAME_1 + .toLocalizedString(new Object[] {regionName, this.cqName})); + } + + // Make sure that the region is partitioned or + // replicated with distributed ack or global. + DataPolicy dp = this.cqBaseRegion.getDataPolicy(); + this.isPR = dp.withPartitioning(); + if (!(this.isPR || dp.withReplication())) { + String errMsg = null; + //replicated regions with eviction set to local destroy get turned into preloaded + if (dp.withPreloaded() && cqBaseRegion.getAttributes().getEvictionAttributes() != null && cqBaseRegion.getAttributes().getEvictionAttributes().getAction().equals(EvictionAction.LOCAL_DESTROY)) { + errMsg = LocalizedStrings.CqQueryImpl_CQ_NOT_SUPPORTED_FOR_REPLICATE_WITH_LOCAL_DESTROY.toString(this.regionName, cqBaseRegion.getAttributes().getEvictionAttributes().getAction()); + } + else { + errMsg = "The region " + this.regionName + + " specified in CQ creation is neither replicated nor partitioned; " + + "only replicated or partitioned regions are allowed in CQ creation."; + } + if (isDebugEnabled){ + logger.debug(errMsg); + } + throw new CqException(errMsg); + } + if ((dp.withReplication() && + (!(cqBaseRegion.getAttributes().getScope().isDistributedAck() || + cqBaseRegion.getAttributes().getScope().isGlobal())))) { + String errMsg = "The replicated region " + this.regionName + + " specified in CQ creation does not have scope supported by CQ." + + " The CQ supported scopes are DISTRIBUTED_ACK and GLOBAL."; + if (isDebugEnabled){ + logger.debug(errMsg); + } + throw new CqException(errMsg); + } + + //checkAndSetCqOnRegion(); + + //Can be null by the time we are here + if (clientProxy != null) { + clientProxy.incCqCount(); + if (clientProxy.hasOneCq()) { + cqService.stats.incClientsWithCqs(); + } + if (isDebugEnabled) { + logger.debug("Added CQ to the base region: {} With key as: {}", cqBaseRegion.getFullPath(), serverCqName); + } + } + + // this.cqService.addToCqEventKeysMap(this); + this.updateCqCreateStats(); + + // Initialize the state of CQ. + if(this.cqState.getState() != p_cqState) { + setCqState(p_cqState); + } + + //Register is called from both filter profile and cqService + //In either case, if we are trying to start/run the cq, we need to add + //it to other matching cqs for performance reasons + if (p_cqState == CqStateImpl.RUNNING) { + // Add to the matchedCqMap. + cqService.addToMatchingCqMap(this); + } + + // Initialize CQ results (key) cache. + if(CqServiceProvider.MAINTAIN_KEYS) { + this.cqResultKeys = new HashMap <Object, Object>(); + // Currently the CQ Result keys are not cached for the Partitioned + // Regions. Supporting this with PR needs more work like forcing + // query execution on primary buckets only; and handling the bucket + // re-balancing. Once this is added remove the check with PR region. + // Only the events which are seen during event processing is + // added to the results cache (not from the CQ Results). + if (this.isPR){ + this.setCqResultsCacheInitialized(); + } else { + this.destroysWhileCqResultsInProgress = new HashSet <Object>(); + } + } + + if (p_ccn != null) { + try { + cqService.addToCqMap(this); + } catch (CqExistsException cqe) { + // Should not happen. + throw new CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_CREATE_CQ_0_ERROR__1.toLocalizedString(new Object[] { cqName, cqe.getMessage()})); + } + this.cqBaseRegion.getFilterProfile().registerCq(this); + } + } + + /** + * For Test use only. + * @return CQ Results Cache. + */ + public Set<Object> getCqResultKeyCache() { + if (this.cqResultKeys != null){ + synchronized (this.cqResultKeys) { + return Collections.synchronizedSet(new HashSet<Object>(this.cqResultKeys.keySet())); + } + } else { + return null; + } + } + + /** + * Returns parameterized query used by the server. + * This method replaces Region name with $1 and if type is not specified + * in the query, looks for type from cqattributes and appends into the + * query. + * @return String modified query. + * @throws CqException + */ + private Query constructServerSideQuery() throws QueryException { + GemFireCacheImpl cache = (GemFireCacheImpl)cqService.getCache(); + DefaultQuery locQuery = (DefaultQuery)cache.getLocalQueryService().newQuery(this.queryString); + CompiledSelect select = locQuery.getSimpleSelect(); + CompiledIteratorDef from = (CompiledIteratorDef)select.getIterators().get(0); + // WARNING: ASSUMES QUERY WAS ALREADY VALIDATED FOR PROPER "FORM" ON CLIENT; + // THIS VALIDATION WILL NEED TO BE DONE ON THE SERVER FOR NATIVE CLIENTS, + // BUT IS NOT DONE HERE FOR JAVA CLIENTS. + // The query was already checked on the client that the sole iterator is a + // CompiledRegion + this.regionName = ((CompiledRegion)from.getCollectionExpr()).getRegionPath(); + from.setCollectionExpr(new CompiledBindArgument(1)); + return locQuery; + } + + /** + * Returns if the passed key is part of the CQs result set. + * This method needs to be called once the CQ result key caching + * is completed (cqResultsCacheInitialized is true). + * @param key + * @return true if key is in the Results Cache. + */ + public boolean isPartOfCqResult(Object key) { + // Handle events that may have been deleted, + // but added by result caching. + if (this.cqResultKeys == null) { + logger.warn(LocalizedMessage.create(LocalizedStrings.CqQueryImpl_Null_CQ_Result_Key_Cache_0)); + return false; + } + + synchronized (this.cqResultKeys) { + if (this.destroysWhileCqResultsInProgress != null) { + //this.logger.fine("Removing keys from Destroy Cache For CQ :" + + //this.cqName + " Keys :" + this.destroysWhileCqResultsInProgress); + for (Object k : this.destroysWhileCqResultsInProgress){ + this.cqResultKeys.remove(k); + } + this.destroysWhileCqResultsInProgress = null; + } + return this.cqResultKeys.containsKey(key); + } + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#addToCqResultKeys(java.lang.Object) + */ + @Override + public void addToCqResultKeys(Object key) { + if (!CqServiceProvider.MAINTAIN_KEYS){ + return; + } + + //this.logger.fine("Adding key to Results Cache For CQ :" + + //this.cqName + " key :" + key); + if (this.cqResultKeys != null) { + synchronized (this.cqResultKeys) { + this.cqResultKeys.put(key, TOKEN); + if (!this.cqResultKeysInitialized){ + // This key could be coming after add, destroy. + // Remove this from destroy queue. + //this.logger.fine("Removing key from Destroy Cache For CQ :" + + //this.cqName + " key :" + key); + if (this.destroysWhileCqResultsInProgress != null){ + this.destroysWhileCqResultsInProgress.remove(key); + } + } + } + } + } + + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#removeFromCqResultKeys(java.lang.Object, boolean) + */ + @Override + public void removeFromCqResultKeys(Object key, boolean isTokenMode) { + if (!CqServiceProvider.MAINTAIN_KEYS){ + return; + } + //this.logger.fine("Removing key from Results Cache For CQ :" + + //this.cqName + " key :" + key); + if (this.cqResultKeys != null) { + synchronized (this.cqResultKeys) { + if (isTokenMode && this.cqResultKeys.get(key) != Token.DESTROYED){ + return; + } + this.cqResultKeys.remove(key); + if (!this.cqResultKeysInitialized){ + //this.logger.fine("Adding key to Destroy Cache For CQ :" + + //this.cqName + " key :" + key); + if (this.destroysWhileCqResultsInProgress != null){ + this.destroysWhileCqResultsInProgress.add(key); + } + } + } + } + } + + /** + * Marks the key as destroyed in the CQ Results key cache. + * @param key + */ + public void markAsDestroyedInCqResultKeys(Object key){ + if (!CqServiceProvider.MAINTAIN_KEYS){ + return; + } + //this.logger.fine("Marking key in Results Cache For CQ :" + + // this.cqName + " key :" + key); + + if (this.cqResultKeys != null) { + synchronized (this.cqResultKeys) { + this.cqResultKeys.put(key, Token.DESTROYED); + if (!this.cqResultKeysInitialized){ + //this.logger.fine("Adding key to Destroy Cache For CQ :" + + //this.cqName + " key :" + key); + if (this.destroysWhileCqResultsInProgress != null){ + this.destroysWhileCqResultsInProgress.add(key); + } + } + } + } + } + + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#setCqResultsCacheInitialized() + */ + @Override + public void setCqResultsCacheInitialized() { + if (CqServiceProvider.MAINTAIN_KEYS) { + this.cqResultKeysInitialized = true; + } + } + + /** + * Returns the size of the CQ Result key cache. + * @return size of CQ Result key cache. + */ + public int getCqResultKeysSize() { + if (this.cqResultKeys == null) { + return 0; + } + synchronized (this.cqResultKeys) { + return this.cqResultKeys.size(); + } + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#isOldValueRequiredForQueryProcessing(java.lang.Object) + */ + @Override + public boolean isOldValueRequiredForQueryProcessing(Object key){ + if (this.cqResultKeysInitialized && this.isPartOfCqResult(key)) { + return false; + } + return true; + } + + /** + * Closes the Query. + * On Client side, sends the cq close request to server. + * On Server side, takes care of repository cleanup. + * @throws CqException + */ + public void close() throws CqClosedException, CqException { + close(true); + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#close(boolean) + */ + @Override + public void close(boolean sendRequestToServer) throws CqClosedException, CqException { + final boolean isDebugEnabled = logger.isDebugEnabled(); + if (isDebugEnabled) { + logger.debug("Started closing CQ CqName: {} SendRequestToServer: {}", cqName, sendRequestToServer); + } + // Synchronize with stop and execute CQ commands + synchronized(this.cqState) { + // Check if the cq is already closed. + if (this.isClosed()) { + //throw new CqClosedException("CQ is already closed, CqName : " + this.cqName); + if (isDebugEnabled){ + logger.debug("CQ is already closed, CqName: {}", this.cqName); + } + return; + } + + int stateBeforeClosing = this.cqState.getState(); + this.cqState.setState(CqStateImpl.CLOSING); + boolean isClosed = false; + + // Cleanup the resource used by cq. + this.removeFromCqMap(); + + // Stat update. + if (stateBeforeClosing == CqStateImpl.RUNNING) { + cqService.stats.decCqsActive(); + } else if (stateBeforeClosing == CqStateImpl.STOPPED) { + cqService.stats.decCqsStopped(); + } + + // Clean-up the CQ Results Cache. + if (this.cqResultKeys != null) { + synchronized (this.cqResultKeys){ + this.cqResultKeys.clear(); + } + } + + // Set the state to close, and update stats + this.cqState.setState(CqStateImpl.CLOSED); + cqService.stats.incCqsClosed(); + cqService.stats.decCqsOnClient(); + if(this.stats != null) + this.stats.close(); + } + + if (isDebugEnabled) { + logger.debug("Successfully closed the CQ. {}", cqName); + } + } + + @Override + public ClientProxyMembershipID getClientProxyId() { + return this.clientProxyId; + } + + /** + * Get CacheClientNotifier of this CqQuery. + * @return CacheClientNotifier + */ + public CacheClientNotifier getCacheClientNotifier() { + return this.ccn; + } + + /** + * Clears the resource used by CQ. + * @throws CqException + */ + protected void cleanup() throws CqException { + // CqBaseRegion + try { + if (this.cqBaseRegion != null && !this.cqBaseRegion.isDestroyed()) { + this.cqBaseRegion.getFilterProfile().closeCq(this); + CacheClientProxy clientProxy = ccn.getClientProxy(clientProxyId); + clientProxy.decCqCount(); + if (clientProxy.hasNoCq()) { + cqService.stats.decClientsWithCqs(); + } + } + }catch (Exception ex){ + // May be cache is being shutdown + if (logger.isDebugEnabled()) { + logger.debug("Failed to remove CQ from the base region. CqName :{}", cqName); + } + } + } + + /** + * @param serverCqName The serverCqName to set. + */ + public void setServerCqName(String serverCqName) { + + this.serverCqName = serverCqName; + } + + /** + * Stop or pause executing the query. + */ + public void stop()throws CqClosedException, CqException { + boolean isStopped = false; + synchronized (this.cqState) { + if (this.isClosed()) { + throw new CqClosedException(LocalizedStrings.CqQueryImpl_CQ_IS_CLOSED_CQNAME_0 + .toLocalizedString(this.cqName)); + } + + if (!(this.isRunning())) { + throw new IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0 + .toLocalizedString(this.cqName)); + } + + // Change state and stats on the client side + this.cqState.setState(CqStateImpl.STOPPED); + this.cqService.stats.incCqsStopped(); + this.cqService.stats.decCqsActive(); + if (logger.isDebugEnabled()) { + logger.debug("Successfully stopped the CQ. {}", cqName); + } + } + } + + /* DataSerializableFixedID methods ---------------------------------------- */ + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + //this.cqName = DataSerializer.readString(in); + synchronized(cqState) { + this.cqState.setState(DataSerializer.readInteger(in)); + } + this.isDurable = DataSerializer.readBoolean(in); + this.queryString = DataSerializer.readString(in); + this.filterID = in.readLong(); + } + + /* + public int getDSFID() { + return CQ_QUERY; + } + */ + + public void toData(DataOutput out) throws IOException { + //DataSerializer.writeString(this.cqName, out); + DataSerializer.writeInteger(this.cqState.getState(), out); + DataSerializer.writeBoolean(this.isDurable, out); + DataSerializer.writeString(this.queryString, out); + out.writeLong(this.filterID); + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#isPR() + */ + @Override + public boolean isPR() { + return isPR; + } + + @Override + public CqAttributes getCqAttributes() { + throw new IllegalStateException("CQ attributes are not available on the server"); + } + + @Override + public CqAttributesMutator getCqAttributesMutator() { + throw new IllegalStateException("CQ attributes are not available on the server"); + } + + @Override + public <E> CqResults<E> executeWithInitialResults() throws CqClosedException, + RegionNotFoundException, CqException { + throw new IllegalStateException("Execute cannot be called on a CQ on the server"); + } + + @Override + public void execute() throws CqClosedException, RegionNotFoundException, + CqException { + throw new IllegalStateException("Execute cannot be called on a CQ on the server"); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java new file mode 100644 index 0000000..85e886b --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java @@ -0,0 +1,43 @@ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import com.gemstone.gemfire.cache.RegionDestroyedException; +import com.gemstone.gemfire.cache.operations.QueryOperationContext; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.QueryException; +import com.gemstone.gemfire.cache.query.QueryInvalidException; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.cache.query.internal.CqEntry; +import com.gemstone.gemfire.cache.query.internal.DefaultQuery; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.cache.query.internal.types.CollectionTypeImpl; +import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; +import com.gemstone.gemfire.cache.query.types.CollectionType; +import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; +import com.gemstone.gemfire.distributed.internal.DistributionStats; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.cache.CachedDeserializable; +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand; +import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommandQuery; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats; +import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage; +import com.gemstone.gemfire.internal.cache.tier.sockets.Message; +import com.gemstone.gemfire.internal.cache.tier.sockets.ObjectPartList; +import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.internal.security.AuthorizeRequestPP; + +public abstract class BaseCQCommand extends BaseCommandQuery { + + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java new file mode 100644 index 0000000..4f8b2ef --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java @@ -0,0 +1,125 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.*; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.distributed.internal.DistributionStats; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.security.AuthorizeRequest; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + + +public class CloseCQ extends BaseCQCommand { + + private final static CloseCQ singleton = new CloseCQ(); + + public static Command getCommand() { + return singleton; + } + + private CloseCQ() { + } + + @Override + public void cmdExecute(Message msg, ServerConnection servConn, long start) + throws IOException { + CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + ClientProxyMembershipID id = servConn.getProxyID(); + CacheServerStats stats = servConn.getCacheServerStats(); + + // Based on MessageType.QUERY + // Added by Rao 2/1/2007 + servConn.setAsTrue(REQUIRES_RESPONSE); + servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + + start = DistributionStats.getStatTime(); + // Retrieve the data from the message parts + String cqName = msg.getPart(0).getString(); + + if (logger.isDebugEnabled()) { + logger.debug("{}: Received close CQ request from {} cqName: {}", servConn.getName(), servConn.getSocketString(), cqName); + } + + // Process the query request + if (cqName == null) { + String err = LocalizedStrings.CloseCQ_THE_CQNAME_FOR_THE_CQ_CLOSE_REQUEST_IS_NULL.toLocalizedString(); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg + .getTransactionId(), null, servConn); + return; + } + + // Process CQ close request + try { + // Append Client ID to CQ name + CqService cqService = crHelper.getCache().getCqService(); + cqService.start(); + + String serverCqName = cqName; + if (id != null) { + serverCqName = cqService.constructServerCqName(cqName, id); + } + InternalCqQuery cqQuery = cqService.getCq(serverCqName); + + AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + if (authzRequest != null) { + String queryStr = null; + Set cqRegionNames = null; + + if (cqQuery != null) { + queryStr = cqQuery.getQueryString(); + cqRegionNames = new HashSet(); + cqRegionNames.add(((InternalCqQuery)cqQuery).getRegionName()); + authzRequest.closeCQAuthorize(cqName, queryStr, cqRegionNames); + } + + } + // String cqNameWithClientId = new String(cqName + "__" + + // getMembershipID()); + cqService.closeCq(cqName, id); + if(cqQuery != null) + servConn.removeCq(cqName, cqQuery.isDurable()); + } + catch (CqException cqe) { + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), + cqe, servConn); + return; + } + catch (Exception e) { + String err = LocalizedStrings.CloseCQ_EXCEPTION_WHILE_CLOSING_CQ_CQNAME_0.toLocalizedString(cqName); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, + msg.getTransactionId(), e, servConn); + return; + } + + // Send OK to client + sendCqResponse(MessageType.REPLY, LocalizedStrings.CloseCQ_CQ_CLOSED_SUCCESSFULLY.toLocalizedString(), msg.getTransactionId(), null, servConn); + servConn.setAsTrue(RESPONDED); + + { + long oldStart = start; + start = DistributionStats.getStatTime(); + stats.incProcessCloseCqTime(start - oldStart); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java new file mode 100644 index 0000000..9f111fc --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java @@ -0,0 +1,162 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.*; +import com.gemstone.gemfire.cache.operations.ExecuteCQOperationContext; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.distributed.internal.DistributionStats; +import com.gemstone.gemfire.cache.query.internal.DefaultQuery; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.security.AuthorizeRequest; + +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + + +public class ExecuteCQ extends BaseCQCommand { + protected static final Logger logger = LogService.getLogger(); + + private final static ExecuteCQ singleton = new ExecuteCQ(); + + public static Command getCommand() { + return singleton; + } + + private ExecuteCQ() { + } + + @Override + public void cmdExecute(Message msg, ServerConnection servConn, long start) + throws IOException, InterruptedException { + AcceptorImpl acceptor = servConn.getAcceptor(); + CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + ClientProxyMembershipID id = servConn.getProxyID(); + CacheServerStats stats = servConn.getCacheServerStats(); + + servConn.setAsTrue(REQUIRES_RESPONSE); + servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + + // Retrieve the data from the message parts + String cqName = msg.getPart(0).getString(); + String cqQueryString = msg.getPart(1).getString(); + int cqState = msg.getPart(2).getInt(); + + Part isDurablePart = msg.getPart(3); + byte[] isDurableByte = isDurablePart.getSerializedForm(); + boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? false + : true; + if (logger.isDebugEnabled()) { + logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", servConn.getName(), MessageType.getString(msg.getMessageType()), servConn.getSocketString(), cqName, cqQueryString); + } + + DefaultQueryService qService = null; + CqService cqServiceForExec = null; + Query query = null; + Set cqRegionNames = null; + ExecuteCQOperationContext executeCQContext = null; + ServerCQ cqQuery = null; + + try { + qService = (DefaultQueryService)((GemFireCacheImpl)crHelper.getCache()).getLocalQueryService(); + + // Authorization check + AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + if (authzRequest != null) { + query = qService.newQuery(cqQueryString); + cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null); + executeCQContext = authzRequest.executeCQAuthorize(cqName, + cqQueryString, cqRegionNames); + String newCqQueryString = executeCQContext.getQuery(); + + if (!cqQueryString.equals(newCqQueryString)) { + query = qService.newQuery(newCqQueryString); + cqQueryString = newCqQueryString; + cqRegionNames = executeCQContext.getRegionNames(); + if (cqRegionNames == null) { + cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null); + } + } + } + + cqServiceForExec = qService.getCqService(); + cqQuery = cqServiceForExec.executeCq(cqName, cqQueryString, + cqState, id, acceptor.getCacheClientNotifier(), isDurable, false, 0, null); + } + catch (CqException cqe) { + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), + cqe, servConn); + return; + } + catch (Exception e) { + writeChunkedException(msg, e, false, servConn); + return; + } + + long oldstart = start; + boolean sendResults = false; + boolean successQuery = false; + + if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) { + sendResults = true; + } + + // Execute the query and send the result-set to client. + try { + if (query == null) { + query = qService.newQuery(cqQueryString); + cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null); + } + ((DefaultQuery)query).setIsCqQuery(true); + successQuery = processQuery(msg, query, cqQueryString, + cqRegionNames, start, cqQuery, executeCQContext, servConn, sendResults); + + // Update the CQ statistics. + cqQuery.getVsdStats().setCqInitialResultsTime((DistributionStats.getStatTime()) - oldstart); + stats.incProcessExecuteCqWithIRTime((DistributionStats.getStatTime()) - oldstart); + //logger.fine("Time spent in execute with initial results :" + DistributionStats.getStatTime() + ", " + oldstart); + } finally { // To handle any exception. + // If failure to execute the query, close the CQ. + if (!successQuery) { + try { + cqServiceForExec.closeCq(cqName, id); + } + catch (Exception ex) { + // Ignore. + } + } + } + + if (!sendResults && successQuery) { + // Send OK to client + sendCqResponse(MessageType.REPLY, LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(), msg + .getTransactionId(), null, servConn); + + long start2 = DistributionStats.getStatTime(); + stats.incProcessCreateCqTime(start2 - oldstart); + } + servConn.setAsTrue(RESPONDED); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java new file mode 100755 index 0000000..8e19ef3 --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java @@ -0,0 +1,214 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.operations.ExecuteCQOperationContext; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.internal.DefaultQuery; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl; +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider; +import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl; +import com.gemstone.gemfire.distributed.internal.DistributionStats; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.cache.tier.sockets.Message; +import com.gemstone.gemfire.internal.cache.tier.sockets.Part; +import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection; +import com.gemstone.gemfire.internal.cache.vmotion.VMotionObserver; +import com.gemstone.gemfire.internal.cache.vmotion.VMotionObserverHolder; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.security.AuthorizeRequest; + + +/** + * @author aingle + * @since 6.1 + */ + +public class ExecuteCQ61 extends BaseCQCommand { + protected static final Logger logger = LogService.getLogger(); + + private final static ExecuteCQ61 singleton = new ExecuteCQ61(); + + public static Command getCommand() { + return singleton; + } + + private ExecuteCQ61() { + } + + @Override + public void cmdExecute(Message msg, ServerConnection servConn, long start) + throws IOException, InterruptedException { + AcceptorImpl acceptor = servConn.getAcceptor(); + CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + ClientProxyMembershipID id = servConn.getProxyID(); + CacheServerStats stats = servConn.getCacheServerStats(); + + servConn.setAsTrue(REQUIRES_RESPONSE); + servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + + // Retrieve the data from the message parts + String cqName = msg.getPart(0).getString(); + String cqQueryString = msg.getPart(1).getString(); + int cqState = msg.getPart(2).getInt(); + + Part isDurablePart = msg.getPart(3); + byte[] isDurableByte = isDurablePart.getSerializedForm(); + boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? false + : true; +// region data policy + Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts()-1); + byte[] regionDataPolicyPartBytes = regionDataPolicyPart.getSerializedForm(); + if (logger.isDebugEnabled()) { + logger.debug("{}: Received {} request from {} CqName: {} queryString: {}", servConn.getName(), MessageType.getString(msg.getMessageType()), servConn.getSocketString(), cqName, cqQueryString); + } + + // Check if the Server is running in NotifyBySubscription=true mode. + CacheClientNotifier ccn = acceptor.getCacheClientNotifier(); + if (ccn != null) { + CacheClientProxy proxy = ccn.getClientProxy(id); + if (proxy != null && !proxy.isNotifyBySubscription()) { + // This should have been taken care at the client. + String err = LocalizedStrings.ExecuteCQ_SERVER_NOTIFYBYSUBSCRIPTION_MODE_IS_SET_TO_FALSE_CQ_EXECUTION_IS_NOT_SUPPORTED_IN_THIS_MODE.toLocalizedString(); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg + .getTransactionId(), null, servConn); + return; + } + } + + DefaultQueryService qService = null; + CqServiceImpl cqServiceForExec = null; + Query query = null; + Set cqRegionNames = null; + ExecuteCQOperationContext executeCQContext = null; + ServerCQImpl cqQuery = null; + + try { + qService = (DefaultQueryService)((GemFireCacheImpl)crHelper.getCache()).getLocalQueryService(); + + // Authorization check + AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + if (authzRequest != null) { + query = qService.newQuery(cqQueryString); + cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null); + executeCQContext = authzRequest.executeCQAuthorize(cqName, + cqQueryString, cqRegionNames); + String newCqQueryString = executeCQContext.getQuery(); + + if (!cqQueryString.equals(newCqQueryString)) { + query = qService.newQuery(newCqQueryString); + cqQueryString = newCqQueryString; + cqRegionNames = executeCQContext.getRegionNames(); + if (cqRegionNames == null) { + cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null); + } + } + } + + // test hook to trigger vMotion during CQ registration + + if (CqServiceProvider.VMOTION_DURING_CQ_REGISTRATION_FLAG) { + VMotionObserver vmo = VMotionObserverHolder.getInstance(); + vmo.vMotionBeforeCQRegistration(); + } + + cqServiceForExec = (CqServiceImpl) qService.getCqService(); + //registering cq with serverConnection so that when CCP will require auth info it can access that + //registering cq auth before as possibility that you may get event + servConn.setCq(cqName, isDurable); + cqQuery = (ServerCQImpl) cqServiceForExec.executeCq(cqName, cqQueryString, cqState, + id, ccn, isDurable, true, regionDataPolicyPartBytes[0], null); + } + catch (CqException cqe) { + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), + cqe, servConn); + servConn.removeCq(cqName, isDurable); + return; + } + catch (Exception e) { + writeChunkedException(msg, e, false, servConn); + servConn.removeCq(cqName, isDurable); + return; + } + + long oldstart = start; + boolean sendResults = false; + boolean successQuery = false; + + if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) { + sendResults = true; + } + + // Execute the query only if it is execute with initial results or + // if it is a non PR query with execute query and maintain keys flags set + if(sendResults || (CqServiceImpl.EXECUTE_QUERY_DURING_INIT && CqServiceProvider.MAINTAIN_KEYS && !cqQuery.isPR())) { + // Execute the query and send the result-set to client. + try { + if (query == null) { + query = qService.newQuery(cqQueryString); + cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null); + } + ((DefaultQuery)query).setIsCqQuery(true); + successQuery = processQuery(msg, query, cqQueryString, + cqRegionNames, start, cqQuery, executeCQContext, servConn, sendResults); + + + // Update the CQ statistics. + cqQuery.getVsdStats().setCqInitialResultsTime((DistributionStats.getStatTime()) - oldstart); + stats.incProcessExecuteCqWithIRTime((DistributionStats.getStatTime()) - oldstart); + //logger.fine("Time spent in execute with initial results :" + DistributionStats.getStatTime() + ", " + oldstart); + } finally { // To handle any exception. + // If failure to execute the query, close the CQ. + if (!successQuery) { + try { + cqServiceForExec.closeCq(cqName, id); + } + catch (Exception ex) { + // Ignore. + } + } + } + } else { + // Don't execute query for cq.execute and + // if it is a PR query with execute query and maintain keys flags not set + cqQuery.cqResultKeysInitialized = true; + successQuery = true; + } + + if (!sendResults && successQuery) { + // Send OK to client + sendCqResponse(MessageType.REPLY, LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(), msg + .getTransactionId(), null, servConn); + + long start2 = DistributionStats.getStatTime(); + stats.incProcessCreateCqTime(start2 - oldstart); + } + servConn.setAsTrue(RESPONDED); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java new file mode 100644 index 0000000..b483ddf --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java @@ -0,0 +1,94 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + + +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.*; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.distributed.internal.DistributionStats; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; + +import java.io.IOException; + + +public class GetCQStats extends BaseCQCommand { + + private final static GetCQStats singleton = new GetCQStats(); + + public static Command getCommand() { + return singleton; + } + + private GetCQStats() { + } + + @Override + public void cmdExecute(Message msg, ServerConnection servConn, long start) + throws IOException { + CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + + CacheServerStats stats = servConn.getCacheServerStats(); + + servConn.setAsTrue(REQUIRES_RESPONSE); + servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + + final boolean isDebugEnabled = logger.isDebugEnabled(); + if (isDebugEnabled) { + logger.debug("{}: Received close all client CQs request from {}", servConn.getName(), servConn.getSocketString()); + } + + // Retrieve the data from the message parts + String cqName = msg.getPart(0).getString(); + + if (isDebugEnabled) { + logger.debug("{}: Received close CQ request from {} cqName: {}", servConn.getName(), servConn.getSocketString(), cqName); + } + + // Process the query request + if (cqName == null) { + String err = "The cqName for the cq stats request is null"; + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg + .getTransactionId(), null, servConn); + return; + + } + else { + // Process the cq request + try { + // make sure the cqservice has been created + // since that is what registers the stats + CqService cqService = crHelper.getCache().getCqService(); + cqService.start(); + } + catch (Exception e) { + String err = "Exception while Getting the CQ Statistics. "; + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg + .getTransactionId(), e, servConn); + return; + } + } + // Send OK to client + sendCqResponse(MessageType.REPLY, "cq stats sent successfully.", msg + .getTransactionId(), null, servConn); + servConn.setAsTrue(RESPONDED); + + { + long oldStart = start; + start = DistributionStats.getStatTime(); + stats.incProcessGetCqStatsTime(start - oldStart); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java new file mode 100755 index 0000000..f22d141 --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java @@ -0,0 +1,137 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.operations.GetDurableCQsOperationContext; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.internal.DefaultQuery; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.distributed.internal.DistributionStats; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats; +import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.cache.tier.sockets.Message; +import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.security.AuthorizeRequest; + + +public class GetDurableCQs extends BaseCQCommand { + + private final static GetDurableCQs singleton = new GetDurableCQs(); + + public static Command getCommand() { + return singleton; + } + + private GetDurableCQs() { + } + + @Override + public void cmdExecute(Message msg, ServerConnection servConn, long start) + throws IOException, InterruptedException { + AcceptorImpl acceptor = servConn.getAcceptor(); + CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + ClientProxyMembershipID id = servConn.getProxyID(); + CacheServerStats stats = servConn.getCacheServerStats(); + + servConn.setAsTrue(REQUIRES_RESPONSE); + servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + + if (logger.isDebugEnabled()) { + logger.debug("{}: Received {} request from {}", servConn.getName(), MessageType.getString(msg.getMessageType()), servConn.getSocketString()); + } + + DefaultQueryService qService = null; + CqService cqServiceForExec = null; + Query query = null; + Set cqRegionNames = null; + GetDurableCQsOperationContext getDurableCqsOperationContext = null; + InternalCqQuery cqQuery = null; + + try { + qService = (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache()) + .getLocalQueryService(); + + // Authorization check + AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + if (authzRequest != null) { + authzRequest.getDurableCQsAuthorize(); + } + + cqServiceForExec = qService.getCqService(); + List<String> durableCqs = cqServiceForExec.getAllDurableClientCqs(id); + + ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + chunkedResponseMsg.setMessageType(MessageType.RESPONSE); + chunkedResponseMsg.setTransactionId(msg.getTransactionId()); + chunkedResponseMsg.sendHeader(); + + List durableCqList = new ArrayList(maximumChunkSize); + final boolean isTraceEnabled = logger.isTraceEnabled(); + for (Iterator it = durableCqs.iterator(); it.hasNext();) { + Object durableCqName = it.next(); + durableCqList.add(durableCqName); + if (isTraceEnabled) { + logger.trace("{}: getDurableCqsResponse <{}>; list size was {}", servConn.getName(), durableCqName, durableCqList.size()); + } + if (durableCqList.size() == maximumChunkSize) { + // Send the chunk and clear the list + sendDurableCqsResponseChunk(durableCqList, false, servConn); + durableCqList.clear(); + } + } + // Send the last chunk even if the list is of zero size. + sendDurableCqsResponseChunk(durableCqList, true, servConn); + + } catch (CqException cqe) { + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), + cqe, servConn); + return; + } catch (Exception e) { + writeChunkedException(msg, e, false, servConn); + return; + } + } + + private void sendDurableCqsResponseChunk(List list, boolean lastChunk, + ServerConnection servConn) throws IOException { + ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage(); + + chunkedResponseMsg.setNumberOfParts(1); + chunkedResponseMsg.setLastChunk(lastChunk); + chunkedResponseMsg.addObjPart(list, zipValues); + + if (logger.isDebugEnabled()) { + logger.debug("{}: Sending {} durableCQs response chunk{}", servConn.getName(), (lastChunk ? " last " : " "), (logger.isTraceEnabled() ? " keys=" + list + " chunk=<" + chunkedResponseMsg + ">" : "")); + } + + chunkedResponseMsg.sendChunk(servConn); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java new file mode 100644 index 0000000..bd77cee --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java @@ -0,0 +1,94 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + + +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.*; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; + +import java.io.IOException; + +public class MonitorCQ extends BaseCQCommand { + + private final static MonitorCQ singleton = new MonitorCQ(); + + public static Command getCommand() { + return singleton; + } + + private MonitorCQ() { + } + + @Override + public void cmdExecute(Message msg, ServerConnection servConn, long start) + throws IOException { + CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + servConn.setAsTrue(REQUIRES_RESPONSE); + servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + + int op = msg.getPart(0).getInt(); + + if (op < 1) { + // This should have been taken care at the client - remove? + String err = LocalizedStrings.MonitorCQ__0_THE_MONITORCQ_OPERATION_IS_INVALID.toLocalizedString(servConn.getName()); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg + .getTransactionId(), null, servConn); + return; + } + + String regionName = null; + if (msg.getNumberOfParts() == 2) { + // This will be enable/disable on region. + regionName = msg.getPart(1).getString(); + if (regionName == null) { + // This should have been taken care at the client - remove? + String err = LocalizedStrings.MonitorCQ__0_A_NULL_REGION_NAME_WAS_PASSED_FOR_MONITORCQ_OPERATION.toLocalizedString(servConn.getName()); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg + .getTransactionId(), null, servConn); + return; + } + } + + if (logger.isDebugEnabled()) { + logger.debug("{}: Received MonitorCq request from {} op: {}{}", servConn.getName(), servConn.getSocketString(), op, (regionName != null) ? " RegionName: " + regionName : ""); + } + + try { + CqService cqService = crHelper.getCache().getCqService(); + cqService.start(); + // The implementation of enable/disable cq is changed. + // Instead calling enable/disable client calls execute/stop methods + // at cache and region level. + // This method is retained for future purpose, to support admin level apis + // similar to enable/disable at system/client level. + // Should never come. + throw new CqException (LocalizedStrings.CqService_INVALID_CQ_MONITOR_REQUEST_RECEIVED.toLocalizedString()); + } + catch (CqException cqe) { + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), + cqe, servConn); + return; + } + catch (Exception e) { + String err = LocalizedStrings.MonitorCQ_EXCEPTION_WHILE_HANDLING_THE_MONITOR_REQUEST_OP_IS_0.toLocalizedString(Integer.valueOf(op)); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, + msg.getTransactionId(), e, servConn); + return; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java new file mode 100644 index 0000000..157158e --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java @@ -0,0 +1,129 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.tier.sockets.command; + +import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.*; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.distributed.internal.DistributionStats; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.security.AuthorizeRequest; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + + +public class StopCQ extends BaseCQCommand { + + private final static StopCQ singleton = new StopCQ(); + + public static Command getCommand() { + return singleton; + } + + private StopCQ() { + } + + @Override + public void cmdExecute(Message msg, ServerConnection servConn, long start) + throws IOException { + CachedRegionHelper crHelper = servConn.getCachedRegionHelper(); + ClientProxyMembershipID id = servConn.getProxyID(); + CacheServerStats stats = servConn.getCacheServerStats(); + + // Based on MessageType.QUERY + // Added by Rao 2/1/2007 + servConn.setAsTrue(REQUIRES_RESPONSE); + servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE); + + start = DistributionStats.getStatTime(); + // Retrieve the data from the message parts + String cqName = msg.getPart(0).getString(); + + if (logger.isDebugEnabled()) { + logger.debug("{}: Received stop CQ request from {} cqName: {}", servConn.getName(), servConn.getSocketString(), cqName); + } + + // Process the query request + if (cqName == null) { + String err = LocalizedStrings.StopCQ_THE_CQNAME_FOR_THE_CQ_STOP_REQUEST_IS_NULL.toLocalizedString(); + sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg + .getTransactionId(), null, servConn); + return; + } + + // Process CQ stop request + try { + // Append Client ID to CQ name + CqService cqService = crHelper.getCache().getCqService(); + cqService.start(); + // String cqNameWithClientId = new String(cqName + "__" + + // getMembershipID()); + String serverCqName = cqName; + if (id != null) { + serverCqName = cqService.constructServerCqName(cqName, id); + } + InternalCqQuery cqQuery = cqService.getCq(serverCqName); + + AuthorizeRequest authzRequest = servConn.getAuthzRequest(); + if (authzRequest != null) { + String queryStr = null; + Set cqRegionNames = null; + + if (cqQuery != null) { + queryStr = cqQuery.getQueryString(); + cqRegionNames = new HashSet(); + cqRegionNames.add(((CqQueryImpl)cqQuery).getRegionName()); + } + authzRequest.stopCQAuthorize(cqName, queryStr, cqRegionNames); + } + cqService.stopCq(cqName, id); + if(cqQuery != null) + servConn.removeCq(cqName, cqQuery.isDurable()); + } + catch (CqException cqe) { + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(), + cqe, servConn); + return; + } + catch (Exception e) { + String err = LocalizedStrings.StopCQ_EXCEPTION_WHILE_STOPPING_CQ_NAMED_0 + .toLocalizedString(cqName); + sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, + msg.getTransactionId(), e, servConn); + return; + } + + // Send OK to client + sendCqResponse(MessageType.REPLY, LocalizedStrings.StopCQ_CQ_STOPPED_SUCCESSFULLY.toLocalizedString(), msg + .getTransactionId(), null, servConn); + + servConn.setAsTrue(RESPONDED); + + { + long oldStart = start; + start = DistributionStats.getStatTime(); + stats.incProcessStopCqTime(start - oldStart); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory b/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory new file mode 100644 index 0000000..0c51c3d --- /dev/null +++ b/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory @@ -0,0 +1 @@ +com.gemstone.gemfire.cache.query.internal.cq.CqServiceFactoryImpl