http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java
new file mode 100644
index 0000000..3e932cf
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceVsdStats.java
@@ -0,0 +1,402 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.internal.cq;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+import com.gemstone.gemfire.internal.cache.FilterProfile;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * This class tracks GemFire statistics related to CqService.
+ * Specifically the following statistics are tracked:
+ *      Number of CQs created
+ *      Number of active CQs
+ *      Number of CQs suspended or stopped
+ *      Number of CQs closed
+ *      Number of CQs on a client 
+ * 
+ * @author Rao Madduri
+ * @since 5.5 
+ */
+public class CqServiceVsdStats
+{
+  private static final Logger logger = LogService.getLogger();
+  
+  /** The <code>StatisticsType</code> of the statistics */
+  private static final StatisticsType _type;
+
+  /** Name of the created CQs statistic */
+  protected static final String CQS_CREATED = "numCqsCreated";
+
+  /** Name of the active CQs statistic */
+  protected static final String CQS_ACTIVE = "numCqsActive";
+
+  /** Name of the stopped CQs statistic */
+  protected static final String CQS_STOPPED = "numCqsStopped";
+
+  /** Name of the closed CQs statistic */
+  protected static final String CQS_CLOSED = "numCqsClosed";
+
+  /** Name of the client's CQs statistic */
+  protected static final String CQS_ON_CLIENT = "numCqsOnClient";
+  
+  /** Number of clients with CQs statistic */
+  protected static final String CLIENTS_WITH_CQS = "numClientsWithCqs";
+
+  
+  /** CQ query execution time. */
+  protected static final String CQ_QUERY_EXECUTION_TIME = 
"cqQueryExecutionTime";
+
+  /** CQ query execution in progress */
+  protected static final String CQ_QUERY_EXECUTION_IN_PROGRESS = 
"cqQueryExecutionInProgress";
+
+  /** Completed CQ query executions */
+  protected static final String CQ_QUERY_EXECUTIONS_COMPLETED = 
"cqQueryExecutionsCompleted";
+
+  /** Unique CQs, number of different CQ queries */
+  protected static final String UNIQUE_CQ_QUERY = "numUniqueCqQuery";
+  
+  /** Id of the CQs created statistic */
+  private static final int _numCqsCreatedId;
+
+  /** Id of the active CQs statistic */
+  private static final int _numCqsActiveId;
+
+  /** Id of the stopped CQs statistic */
+  private static final int _numCqsStoppedId;
+
+  /** Id of the closed CQs statistic */
+  private static final int _numCqsClosedId;
+
+  /** Id of the CQs on client statistic */
+  private static final int _numCqsOnClientId;
+  
+  /** Id of the Clients with Cqs statistic */
+  private static final int _numClientsWithCqsId;
+
+  /** Id for the CQ query execution time. */
+  private static final int _cqQueryExecutionTimeId;
+
+  /** Id for the CQ query execution in progress */
+  private static final int _cqQueryExecutionInProgressId;
+
+  /** Id for completed CQ query executions */
+  private static final int _cqQueryExecutionsCompletedId;
+
+  /** Id for unique CQs, difference in CQ queries */
+  private static final int _numUniqueCqQuery;
+
+  /**
+   * Static initializer to create and initialize the 
<code>StatisticsType</code>
+   */
+  static {
+    String statName = "CqServiceStats";
+    StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
+
+    _type = f.createType(statName, statName, new StatisticDescriptor[] {
+        f.createLongCounter(CQS_CREATED, "Number of CQs created.", 
"operations"),
+        f.createLongCounter(CQS_ACTIVE, "Number of CQS actively executing.", 
"operations"),
+        f.createLongCounter(CQS_STOPPED, "Number of CQs stopped.", 
"operations"),
+        f.createLongCounter(CQS_CLOSED, "Number of CQs closed.", "operations"),
+        f.createLongCounter(CQS_ON_CLIENT, "Number of CQs on the client.", 
"operations"),
+        f.createLongCounter(CLIENTS_WITH_CQS, "Number of Clients with CQs.", 
"operations"),
+        f.createLongCounter(CQ_QUERY_EXECUTION_TIME, "Time taken for CQ Query 
Execution.", "nanoseconds"),
+        f.createLongCounter(CQ_QUERY_EXECUTIONS_COMPLETED, "Number of CQ Query 
Executions.", "operations"),
+        f.createIntGauge(CQ_QUERY_EXECUTION_IN_PROGRESS, "CQ Query Execution 
In Progress.", "operations"),
+        f.createIntGauge(UNIQUE_CQ_QUERY, "Number of Unique CQ Querys.", 
"Queries"),
+
+    });
+
+    // Initialize id fields
+    _numCqsCreatedId = _type.nameToId(CQS_CREATED);
+    _numCqsActiveId = _type.nameToId(CQS_ACTIVE);
+    _numCqsStoppedId = _type.nameToId(CQS_STOPPED);
+    _numCqsClosedId = _type.nameToId(CQS_CLOSED);
+    _numCqsOnClientId = _type.nameToId(CQS_ON_CLIENT);
+    _numClientsWithCqsId = _type.nameToId(CLIENTS_WITH_CQS);
+    _cqQueryExecutionTimeId = _type.nameToId(CQ_QUERY_EXECUTION_TIME);    
+    _cqQueryExecutionsCompletedId = 
_type.nameToId(CQ_QUERY_EXECUTIONS_COMPLETED);
+    _cqQueryExecutionInProgressId = 
_type.nameToId(CQ_QUERY_EXECUTION_IN_PROGRESS);
+    _numUniqueCqQuery = _type.nameToId(UNIQUE_CQ_QUERY);
+    
+  }
+
+  /** The <code>Statistics</code> instance to which most behavior is delegated 
*/
+  private final Statistics _stats;
+
+  /**
+   * Constructor.
+   * 
+   * @param factory
+   *          The <code>StatisticsFactory</code> which creates the
+   *          <code>Statistics</code> instance
+   */
+  public CqServiceVsdStats(StatisticsFactory factory) {
+    this._stats = factory.createAtomicStatistics(_type, "CqServiceStats");
+  }
+
+  // /////////////////// Instance Methods /////////////////////
+
+  /**
+   * Closes the <code>HARegionQueueStats</code>.
+   */
+  public void close()
+  {
+    this._stats.close();
+  }
+
+  /**
+   * Returns the current value of the "numCqsCreated" stat.
+   * 
+   * @return the current value of the "numCqsCreated" stat
+   */
+  public long getNumCqsCreated()
+  {
+    return this._stats.getLong(_numCqsCreatedId);
+  }
+
+  /**
+   * Increments the "numCqsCreated" stat by 1.
+   */
+  public void incCqsCreated()
+  {
+    this._stats.incLong(_numCqsCreatedId, 1);
+  }
+
+  /**
+   * Returns the current value of the "numCqsActive" stat.
+   * 
+   * @return the current value of the "numCqsActive" stat
+   */
+  public long getNumCqsActive()
+  {
+    return this._stats.getLong(_numCqsActiveId);
+  }
+
+  /**
+   * Increments the "numCqsActive" stat by 1.
+   */
+  public void incCqsActive()
+  {
+    this._stats.incLong(_numCqsActiveId, 1);
+  }
+  
+  /**
+   * Decrements the "numCqsActive" stat by 1.
+   */
+  public void decCqsActive()
+  {
+    this._stats.incLong(_numCqsActiveId, -1);
+  }
+  
+  /**
+   * Returns the current value of the "numCqsStopped" stat.
+   * 
+   * @return the current value of the "numCqsStopped" stat
+   */
+  public long getNumCqsStopped()
+  {
+    return this._stats.getLong(_numCqsStoppedId);
+  }
+
+  /**
+   * Increments the "numCqsStopped" stat by 1.
+   */
+  public void incCqsStopped()
+  {
+    this._stats.incLong(_numCqsStoppedId, 1);
+  }
+
+  /**
+   * Decrements the "numCqsStopped" stat by 1.
+   */
+  public void decCqsStopped()
+  {
+    this._stats.incLong(_numCqsStoppedId, -1);
+  }
+
+  /**
+   * Returns the current value of the "numCqsClosed" stat.
+   * 
+   * @return the current value of the "numCqsClosed" stat
+   */
+  public long getNumCqsClosed()
+  {
+    return this._stats.getLong(_numCqsClosedId);
+  }
+
+  /**
+   * Increments the "numCqsClosed" stat by 1.
+   */
+  public void incCqsClosed()
+  {
+    this._stats.incLong(_numCqsClosedId, 1);
+  }
+
+  /**
+   * Returns the current value of the "numCqsOnClient" stat.
+   * 
+   * @return the current value of the "numCqsOnClient" stat
+   */
+  public long getNumCqsOnClient()
+  {
+    return this._stats.getLong(_numCqsOnClientId);
+  }
+
+  /**
+   * Increments the "numCqsOnClient" stat by 1.
+   */
+  public void incCqsOnClient()
+  {
+    this._stats.incLong(_numCqsOnClientId, 1);
+  }
+  
+  /**
+   * Decrements the "numCqsOnClient" stat by 1.
+   */
+  public void decCqsOnClient()
+  {
+    this._stats.incLong(_numCqsOnClientId, -1);
+  }
+  
+  /**
+   * Returns the current value of the "numClientsWithCqs" stat.
+   * 
+   * @return the current value of the "numClientsWithCqs" stat
+   */
+  public long getNumClientsWithCqs()
+  {
+    return this._stats.getLong(_numClientsWithCqsId);
+  }
+
+  /**
+   * Increments the "numClientsWithCqs" stat by 1.
+   */
+  public void incClientsWithCqs()
+  {
+    this._stats.incLong(_numClientsWithCqsId, 1);
+  }
+  
+  /**
+   * Decrements the "numCqsOnClient" stat by 1.
+   */
+  public void decClientsWithCqs()
+  {
+    this._stats.incLong(_numClientsWithCqsId, -1);
+  }
+  
+  /**
+   * Start the CQ Query Execution time.
+   */
+  public long startCqQueryExecution() {
+    this._stats.incInt(_cqQueryExecutionInProgressId, 1);
+    return NanoTimer.getTime(); 
+  }
+
+  /**
+   * End CQ Query Execution Time.
+   * @param start long time value.
+   */
+  public void endCqQueryExecution(long start) {
+    long ts = NanoTimer.getTime(); 
+    this._stats.incLong(_cqQueryExecutionTimeId, ts-start);
+    this._stats.incInt(_cqQueryExecutionInProgressId, -1);
+    this._stats.incLong(_cqQueryExecutionsCompletedId, 1);
+  }
+  
+  /**
+   * Returns the total time spent executing the CQ Queries.
+   * @return long time spent.
+   */
+  public long getCqQueryExecutionTime(){
+    return this._stats.getLong(_cqQueryExecutionTimeId);
+  }
+
+  /**
+   * Increments number of Unique queries.
+   */
+  public void incUniqueCqQuery()
+  {
+    this._stats.incInt(_numUniqueCqQuery, 1);
+  }
+
+  /**
+   * Decrements number of unique Queries.
+   */
+  public void decUniqueCqQuery()
+  {
+    this._stats.incInt(_numUniqueCqQuery, -1);
+  }
+  
+  
+  /**
+   * This is a test method.  It silently ignores exceptions and should not be
+   * used outside of unit tests.<p>
+   * Returns the number of CQs (active + suspended) on the given region.
+   * @param regionName
+   */
+  public long numCqsOnRegion(String regionName){
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    if (cache == null) {
+      return 0;
+    }
+    DefaultQueryService queryService = 
(DefaultQueryService)cache.getQueryService();
+    CqService cqService = null;
+    try {
+      cqService = queryService.getCqService();
+    }
+    catch (CqException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Failed to get CqService {}", e.getLocalizedMessage());
+      }
+      e.printStackTrace();
+      return -1; // We're confused
+    }
+    if (((CqServiceImpl) cqService).isServer()) {
+      //If we are on the server, look at the number of CQs in the filter 
profile.
+      try {
+        FilterProfile fp = cache.getFilterProfile(regionName);
+        if (fp == null) {
+          return 0;
+        }
+        return fp.getCqCount();
+      }
+      catch (Exception ex) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Failed to get serverside CQ count for region: {} {}", 
regionName, ex.getLocalizedMessage());
+        }
+      }
+    }
+    else {
+      try {
+        CqQuery[] cqs = queryService.getCqs(regionName);
+        
+        if (cqs != null) {
+          return cqs.length;
+        }
+      } catch(Exception ex) {
+        // Dont do anything.
+      }
+    }
+    return 0;
+  } 
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java
new file mode 100644
index 0000000..492a563
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqStatisticsImpl.java
@@ -0,0 +1,67 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.query.internal.cq;
+
+import com.gemstone.gemfire.cache.query.CqStatistics;
+
+/**
+ * Provides statistical information about a CqQuery.
+ * 
+ * @since 5.5
+ * @author Rao Madduri
+ */
+public class CqStatisticsImpl implements CqStatistics {
+  private CqQueryImpl cqQuery;
+  
+//  private long numInserts;
+//  private long numDeletes;
+//  private long numUpdates;
+//  private long numEvents;
+  
+  /**
+   * Constructor for CqStatisticsImpl
+   * @param cq - CqQuery reference to the CqQueryImpl object
+   */
+  public CqStatisticsImpl(CqQueryImpl cq) {
+    cqQuery = cq;
+  }
+  
+  /**
+   * Returns the number of Insert events for this CQ.
+   * @return the number of insert events
+   */
+  public long numInserts() {
+    return this.cqQuery.getVsdStats().getNumInserts();
+  }
+  
+  /**
+   * Returns number of Delete events for this CQ.
+   * @return the number of delete events
+   */
+  public long numDeletes() {
+    return this.cqQuery.getVsdStats().getNumDeletes();
+  }
+  
+  /**
+   * Returns number of Update events for this CQ.
+   * @return the number of update events
+   */
+  public long numUpdates(){
+    return this.cqQuery.getVsdStats().getNumUpdates();
+  }
+  
+  /**
+   * Returns the total number of events for this CQ.
+   * @return the total number of insert, update, and delete events
+   */
+  public long numEvents(){
+    return cqQuery.getVsdStats().getNumEvents();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java
new file mode 100644
index 0000000..385c0a5
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl.java
@@ -0,0 +1,639 @@
+package com.gemstone.gemfire.cache.query.internal.cq;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.client.internal.UserAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesMutator;
+import com.gemstone.gemfire.cache.query.CqClosedException;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqExistsException;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqResults;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.internal.CompiledBindArgument;
+import com.gemstone.gemfire.cache.query.internal.CompiledIteratorDef;
+import com.gemstone.gemfire.cache.query.internal.CompiledRegion;
+import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
+import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.Token;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import 
com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.i18n.StringId;
+
+public class ServerCQImpl extends CqQueryImpl implements DataSerializable, 
ServerCQ {
+  private static final Logger logger = LogService.getLogger();
+
+  /** 
+   * This holds the keys that are part of the CQ query results.
+   * Using this CQ engine can determine whether to execute 
+   * query on old value from EntryEvent, which is an expensive
+   * operation. 
+   */
+  private volatile HashMap<Object, Object> cqResultKeys;
+
+  /** 
+   * This maintains the keys that are destroyed while the Results
+   * Cache is getting constructed. This avoids any keys that are
+   * destroyed (after query execution) but is still part of the 
+   * CQs result.
+   */
+  private HashSet<Object> destroysWhileCqResultsInProgress;
+  
+  /**
+   * To indicate if the CQ results key cache is initialized.
+   */
+  public volatile boolean cqResultKeysInitialized = false;
+  
+  /** Boolean flag to see if the CQ is on Partitioned Region */
+  public volatile boolean isPR = false;
+  
+  private ClientProxyMembershipID clientProxyId = null;
+  
+  private CacheClientNotifier ccn = null;
+  
+  private String serverCqName;
+  
+  
+  /** identifier assigned to this query for FilterRoutingInfos */
+  private Long filterID;
+  
+  public ServerCQImpl(CqServiceImpl cqService, String cqName, String 
queryString, boolean isDurable, String serverCqName)  {
+    super(cqService, cqName, queryString, isDurable);
+    this.serverCqName = serverCqName; // On Client Side serverCqName and 
cqName will be same.
+  }
+  
+  public ServerCQImpl() {
+    //For deserialization
+  }
+
+  /* (non-Javadoc)
+   * @see 
com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#getFilterID()
+   */
+  @Override
+  public Long getFilterID() {
+    return this.filterID;
+  }
+
+  /* (non-Javadoc)
+   * @see 
com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#setFilterID(java.lang.Long)
+   */
+  @Override
+  public void setFilterID(Long filterID) {
+    this.filterID = filterID;
+  }
+  
+  @Override
+  public void setName(String cqName) {
+    this.cqName = this.serverCqName = cqName;
+  }
+  
+  @Override
+  public String getServerCqName() {
+    return this.serverCqName;
+  }
+  
+  @Override
+  public void registerCq(ClientProxyMembershipID p_clientProxyId, 
+      CacheClientNotifier p_ccn, int p_cqState) 
+      throws CqException, RegionNotFoundException {      
+    
+    CacheClientProxy clientProxy = null;    
+    this.clientProxyId = p_clientProxyId; 
+    //servConnection = serverSideConnection;
+
+    if (p_ccn != null) {
+      this.ccn = p_ccn;
+      clientProxy = p_ccn.getClientProxy(p_clientProxyId, true);
+    }
+    
+    /*
+    try {
+      initCq();
+    } catch (CqExistsException cqe) {
+      // Should not happen.
+      throw new 
CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_CREATE_CQ_0_ERROR__1.toLocalizedString(new
 Object[] { cqName, cqe.getMessage()}));
+    }
+    */
+    
+    validateCq();
+    
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    StringId msg = LocalizedStrings.ONE_ARG;
+    Throwable t = null;
+    try {
+      this.query = constructServerSideQuery();
+      if (isDebugEnabled) {
+        logger.debug("Server side query for the cq: {} is: {}", cqName, 
this.query.getQueryString());
+      }
+    } catch (Exception ex) {
+      t = ex;
+      if (ex instanceof ClassNotFoundException) {
+        msg = 
LocalizedStrings.CqQueryImpl_CLASS_NOT_FOUND_EXCEPTION_THE_ANTLRJAR_OR_THE_SPCIFIED_CLASS_MAY_BE_MISSING_FROM_SERVER_SIDE_CLASSPATH_ERROR_0;
+      } else {
+        msg = 
LocalizedStrings.CqQueryImpl_ERROR_WHILE_PARSING_THE_QUERY_ERROR_0;
+      }
+    } finally {
+      if (t != null) {
+        String s = msg.toLocalizedString(t);
+        if (isDebugEnabled) {
+          logger.debug(s, t);
+        }
+        throw new CqException(s);
+      }
+    }
+    
+    // Update Regions Book keeping.
+    // TODO replace getRegion() with getRegionByPathForProcessing() so this 
doesn't block
+    // if the region is still being initialized
+    this.cqBaseRegion = 
(LocalRegion)cqService.getCache().getRegion(regionName); 
+    if (this.cqBaseRegion == null) {
+      throw new 
RegionNotFoundException(LocalizedStrings.CqQueryImpl_REGION__0_SPECIFIED_WITH_CQ_NOT_FOUND_CQNAME_1
+          .toLocalizedString(new Object[] {regionName, this.cqName}));
+    }
+    
+    // Make sure that the region is partitioned or 
+    // replicated with distributed ack or global.
+    DataPolicy dp = this.cqBaseRegion.getDataPolicy();
+    this.isPR = dp.withPartitioning();
+    if (!(this.isPR || dp.withReplication())) {
+      String errMsg = null;
+      //replicated regions with eviction set to local destroy get turned into 
preloaded
+      if (dp.withPreloaded() && 
cqBaseRegion.getAttributes().getEvictionAttributes() != null && 
cqBaseRegion.getAttributes().getEvictionAttributes().getAction().equals(EvictionAction.LOCAL_DESTROY))
 {
+        errMsg = 
LocalizedStrings.CqQueryImpl_CQ_NOT_SUPPORTED_FOR_REPLICATE_WITH_LOCAL_DESTROY.toString(this.regionName,
 cqBaseRegion.getAttributes().getEvictionAttributes().getAction());
+      }
+      else {
+        errMsg = "The region " + this.regionName + 
+            "  specified in CQ creation is neither replicated nor partitioned; 
" +
+        "only replicated or partitioned regions are allowed in CQ creation.";
+      }
+      if (isDebugEnabled){
+        logger.debug(errMsg);
+      }
+      throw new CqException(errMsg);
+    }
+    if ((dp.withReplication() && 
+        (!(cqBaseRegion.getAttributes().getScope().isDistributedAck() || 
+        cqBaseRegion.getAttributes().getScope().isGlobal())))) {
+      String errMsg = "The replicated region " + this.regionName + 
+          " specified in CQ creation does not have scope supported by CQ." +
+          " The CQ supported scopes are DISTRIBUTED_ACK and GLOBAL.";
+      if (isDebugEnabled){
+        logger.debug(errMsg);
+      }
+      throw new CqException(errMsg);
+    }
+    
+    //checkAndSetCqOnRegion();
+    
+    //Can be null by the time we are here
+    if (clientProxy != null) {
+      clientProxy.incCqCount();
+      if (clientProxy.hasOneCq()) {
+        cqService.stats.incClientsWithCqs();
+      }
+      if (isDebugEnabled) {
+        logger.debug("Added CQ to the base region: {} With key as: {}", 
cqBaseRegion.getFullPath(), serverCqName);
+      }
+    }
+    
+    // this.cqService.addToCqEventKeysMap(this);
+    this.updateCqCreateStats();
+    
+    // Initialize the state of CQ.
+    if(this.cqState.getState() != p_cqState) {
+      setCqState(p_cqState);
+    }
+    
+    //Register is called from both filter profile and cqService
+    //In either case, if we are trying to start/run the cq, we need to add
+    //it to other matching cqs for performance reasons
+    if (p_cqState == CqStateImpl.RUNNING) {
+         // Add to the matchedCqMap.
+        cqService.addToMatchingCqMap(this);    
+    }
+
+    // Initialize CQ results (key) cache.
+    if(CqServiceProvider.MAINTAIN_KEYS) {
+      this.cqResultKeys = new HashMap <Object, Object>();
+      // Currently the CQ Result keys are not cached for the Partitioned 
+      // Regions. Supporting this with PR needs more work like forcing 
+      // query execution on primary buckets only; and handling the bucket
+      // re-balancing. Once this is added remove the check with PR region.
+      // Only the events which are seen during event processing is 
+      // added to the results cache (not from the CQ Results).
+      if (this.isPR){
+        this.setCqResultsCacheInitialized();
+      } else {
+        this.destroysWhileCqResultsInProgress = new HashSet <Object>();
+      }
+    } 
+
+    if (p_ccn != null) {
+      try {
+        cqService.addToCqMap(this);
+      } catch (CqExistsException cqe) {
+        // Should not happen.
+        throw new 
CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_CREATE_CQ_0_ERROR__1.toLocalizedString(new
 Object[] { cqName, cqe.getMessage()}));
+      }
+      this.cqBaseRegion.getFilterProfile().registerCq(this);
+    }
+  }
+  
+  /**
+   * For Test use only.
+   * @return CQ Results Cache.
+   */
+  public Set<Object> getCqResultKeyCache() {
+    if (this.cqResultKeys != null){
+      synchronized (this.cqResultKeys) {
+        return Collections.synchronizedSet(new 
HashSet<Object>(this.cqResultKeys.keySet()));
+      }
+    } else {
+      return null;
+    }
+  }
+  
+  /**
+   * Returns parameterized query used by the server.
+   * This method replaces Region name with $1 and if type is not specified
+   * in the query, looks for type from cqattributes and appends into the
+   * query.
+   * @return String modified query.
+   * @throws CqException
+   */
+  private Query constructServerSideQuery() throws QueryException {
+    GemFireCacheImpl cache = (GemFireCacheImpl)cqService.getCache();
+    DefaultQuery locQuery = 
(DefaultQuery)cache.getLocalQueryService().newQuery(this.queryString);      
+    CompiledSelect select = locQuery.getSimpleSelect();
+    CompiledIteratorDef from = 
(CompiledIteratorDef)select.getIterators().get(0);
+    // WARNING: ASSUMES QUERY WAS ALREADY VALIDATED FOR PROPER "FORM" ON 
CLIENT;
+    // THIS VALIDATION WILL NEED TO BE DONE ON THE SERVER FOR NATIVE CLIENTS,
+    // BUT IS NOT DONE HERE FOR JAVA CLIENTS.
+    // The query was already checked on the client that the sole iterator is a
+    // CompiledRegion
+    this.regionName = 
((CompiledRegion)from.getCollectionExpr()).getRegionPath();
+    from.setCollectionExpr(new CompiledBindArgument(1));
+    return locQuery;    
+  }
+  
+  /**
+   * Returns if the passed key is part of the CQs result set.
+   * This method needs to be called once the CQ result key caching
+   * is completed (cqResultsCacheInitialized is true).
+   * @param key
+   * @return true if key is in the Results Cache.
+   */
+  public boolean isPartOfCqResult(Object key) {
+    // Handle events that may have been deleted,
+    // but added by result caching.
+    if (this.cqResultKeys == null) {
+      
logger.warn(LocalizedMessage.create(LocalizedStrings.CqQueryImpl_Null_CQ_Result_Key_Cache_0));
+      return false;
+    }
+
+    synchronized (this.cqResultKeys) {
+      if (this.destroysWhileCqResultsInProgress != null) {
+        //this.logger.fine("Removing keys from Destroy Cache  For CQ :" + 
+        //this.cqName + " Keys :" + this.destroysWhileCqResultsInProgress);
+        for (Object k : this.destroysWhileCqResultsInProgress){
+          this.cqResultKeys.remove(k);  
+        }
+        this.destroysWhileCqResultsInProgress = null;
+      }
+      return this.cqResultKeys.containsKey(key);
+    }
+  }
+    
+  /* (non-Javadoc)
+   * @see 
com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#addToCqResultKeys(java.lang.Object)
+   */
+  @Override
+  public void addToCqResultKeys(Object key) {
+    if (!CqServiceProvider.MAINTAIN_KEYS){
+      return;
+    }
+    
+    //this.logger.fine("Adding key to Results Cache For CQ :" + 
+    //this.cqName + " key :" + key);
+    if (this.cqResultKeys != null) {
+      synchronized (this.cqResultKeys) { 
+        this.cqResultKeys.put(key, TOKEN);
+        if (!this.cqResultKeysInitialized){
+          // This key could be coming after add, destroy.
+          // Remove this from destroy queue.
+          //this.logger.fine("Removing key from Destroy Cache For CQ :" + 
+          //this.cqName + " key :" + key);
+          if (this.destroysWhileCqResultsInProgress != null){
+            this.destroysWhileCqResultsInProgress.remove(key);
+          }
+        }
+      }
+    }
+  }
+
+  
+  /* (non-Javadoc)
+   * @see 
com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#removeFromCqResultKeys(java.lang.Object,
 boolean)
+   */
+  @Override
+  public void removeFromCqResultKeys(Object key, boolean isTokenMode) {
+    if (!CqServiceProvider.MAINTAIN_KEYS){
+      return;
+    }
+    //this.logger.fine("Removing key from Results Cache For CQ :" + 
+    //this.cqName + " key :" + key);
+    if (this.cqResultKeys != null) {
+      synchronized (this.cqResultKeys) { 
+        if (isTokenMode && this.cqResultKeys.get(key) != Token.DESTROYED){
+          return;
+        }
+        this.cqResultKeys.remove(key);
+        if (!this.cqResultKeysInitialized){
+          //this.logger.fine("Adding key to Destroy Cache For CQ :" + 
+          //this.cqName + " key :" + key);
+          if (this.destroysWhileCqResultsInProgress != null){
+            this.destroysWhileCqResultsInProgress.add(key);
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Marks the key as destroyed in the CQ Results key cache.
+   * @param key
+   */
+  public void markAsDestroyedInCqResultKeys(Object key){
+    if (!CqServiceProvider.MAINTAIN_KEYS){
+      return;
+    }
+    //this.logger.fine("Marking key in Results Cache For CQ :" + 
+    //    this.cqName + " key :" + key);
+
+    if (this.cqResultKeys != null) {
+      synchronized (this.cqResultKeys) { 
+        this.cqResultKeys.put(key, Token.DESTROYED);
+        if (!this.cqResultKeysInitialized){
+          //this.logger.fine("Adding key to Destroy Cache For CQ :" + 
+          //this.cqName + " key :" + key);
+          if (this.destroysWhileCqResultsInProgress != null){
+            this.destroysWhileCqResultsInProgress.add(key);
+          }
+        }
+      }
+    }    
+  }
+  
+  
+  /* (non-Javadoc)
+   * @see 
com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#setCqResultsCacheInitialized()
+   */
+  @Override
+  public void setCqResultsCacheInitialized() {
+    if (CqServiceProvider.MAINTAIN_KEYS) {
+      this.cqResultKeysInitialized = true;
+    }
+  }
+   
+  /**
+   * Returns the size of the CQ Result key cache.
+   * @return size of CQ Result key cache.
+   */
+  public int getCqResultKeysSize() {
+    if (this.cqResultKeys == null) {
+      return 0;
+    }
+    synchronized (this.cqResultKeys) { 
+      return this.cqResultKeys.size();
+    }
+  }
+  
+  /* (non-Javadoc)
+   * @see 
com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#isOldValueRequiredForQueryProcessing(java.lang.Object)
+   */
+  @Override
+  public boolean isOldValueRequiredForQueryProcessing(Object key){
+    if (this.cqResultKeysInitialized && this.isPartOfCqResult(key)) {
+      return false;
+    }
+    return true;
+  }
+  
+  /**
+   * Closes the Query.
+   *        On Client side, sends the cq close request to server.
+   *        On Server side, takes care of repository cleanup.
+   * @throws CqException
+   */
+  public void close() throws CqClosedException, CqException {
+    close(true);
+  }
+  
+  /* (non-Javadoc)
+   * @see 
com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#close(boolean)
+   */
+  @Override
+  public void close(boolean sendRequestToServer) throws CqClosedException, 
CqException {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("Started closing CQ CqName: {} SendRequestToServer: {}", 
cqName, sendRequestToServer);
+    }   
+    // Synchronize with stop and execute CQ commands
+    synchronized(this.cqState) {
+      // Check if the cq is already closed.
+      if (this.isClosed()) {
+        //throw new CqClosedException("CQ is already closed, CqName : " + 
this.cqName);
+        if (isDebugEnabled){
+          logger.debug("CQ is already closed, CqName: {}", this.cqName);
+        }
+        return;
+      }
+
+      int stateBeforeClosing = this.cqState.getState();
+      this.cqState.setState(CqStateImpl.CLOSING);      
+      boolean isClosed = false;
+
+      // Cleanup the resource used by cq.
+      this.removeFromCqMap(); 
+
+      // Stat update.
+      if (stateBeforeClosing == CqStateImpl.RUNNING) {
+        cqService.stats.decCqsActive();
+      } else if (stateBeforeClosing == CqStateImpl.STOPPED) {
+        cqService.stats.decCqsStopped();
+      }
+
+      // Clean-up the CQ Results Cache.
+      if (this.cqResultKeys != null) {
+        synchronized (this.cqResultKeys){
+          this.cqResultKeys.clear();
+        }
+      }
+
+      // Set the state to close, and update stats
+      this.cqState.setState(CqStateImpl.CLOSED);
+      cqService.stats.incCqsClosed();
+      cqService.stats.decCqsOnClient();
+      if(this.stats != null)
+        this.stats.close();
+    }
+
+    if (isDebugEnabled) {
+      logger.debug("Successfully closed the CQ. {}", cqName);
+    }
+  }
+  
+  @Override
+  public ClientProxyMembershipID getClientProxyId() {
+    return this.clientProxyId;
+  }
+  
+  /**
+   * Get CacheClientNotifier of this CqQuery.
+   * @return CacheClientNotifier 
+   */
+  public CacheClientNotifier getCacheClientNotifier() {
+    return this.ccn;
+  }
+  
+  /**
+   * Clears the resource used by CQ.
+   * @throws CqException
+   */
+  protected void cleanup() throws CqException {
+    // CqBaseRegion 
+    try {
+      if (this.cqBaseRegion != null && !this.cqBaseRegion.isDestroyed()) {
+        this.cqBaseRegion.getFilterProfile().closeCq(this);
+        CacheClientProxy clientProxy = ccn.getClientProxy(clientProxyId);
+        clientProxy.decCqCount();
+        if (clientProxy.hasNoCq()) {
+          cqService.stats.decClientsWithCqs();
+        }
+      } 
+    }catch (Exception ex){
+      // May be cache is being shutdown
+      if (logger.isDebugEnabled()) {
+        logger.debug("Failed to remove CQ from the base region. CqName :{}", 
cqName);
+      }
+    }
+  }
+  
+  /**
+   * @param serverCqName The serverCqName to set.
+   */
+  public void setServerCqName(String serverCqName) {
+    
+    this.serverCqName = serverCqName;
+  }
+  
+  /**
+   * Stop or pause executing the query.
+   */
+  public void stop()throws CqClosedException, CqException {
+    boolean isStopped = false;
+    synchronized (this.cqState) {
+      if (this.isClosed()) {
+        throw new 
CqClosedException(LocalizedStrings.CqQueryImpl_CQ_IS_CLOSED_CQNAME_0
+            .toLocalizedString(this.cqName));
+      }
+      
+      if (!(this.isRunning())) {
+        throw new 
IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0
+            .toLocalizedString(this.cqName));
+      }
+      
+      // Change state and stats on the client side
+      this.cqState.setState(CqStateImpl.STOPPED);
+      this.cqService.stats.incCqsStopped();
+      this.cqService.stats.decCqsActive();
+      if (logger.isDebugEnabled()) {
+        logger.debug("Successfully stopped the CQ. {}", cqName);
+      }
+    }
+  }
+
+  /* DataSerializableFixedID methods ---------------------------------------- 
*/
+
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    //this.cqName = DataSerializer.readString(in);
+    synchronized(cqState) {
+      this.cqState.setState(DataSerializer.readInteger(in));
+    }
+    this.isDurable = DataSerializer.readBoolean(in);
+    this.queryString = DataSerializer.readString(in);
+    this.filterID = in.readLong();
+  }
+  
+  /*
+  public int getDSFID() {
+    return CQ_QUERY;
+  }
+  */
+  
+  public void toData(DataOutput out) throws IOException {
+    //DataSerializer.writeString(this.cqName, out);
+    DataSerializer.writeInteger(this.cqState.getState(), out);
+    DataSerializer.writeBoolean(this.isDurable, out);
+    DataSerializer.writeString(this.queryString, out);
+    out.writeLong(this.filterID);
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#isPR()
+   */
+  @Override
+  public boolean isPR() {
+    return isPR;
+  }
+
+  @Override
+  public CqAttributes getCqAttributes() {
+    throw new IllegalStateException("CQ attributes are not available on the 
server");
+  }
+
+  @Override
+  public CqAttributesMutator getCqAttributesMutator() {
+    throw new IllegalStateException("CQ attributes are not available on the 
server");
+  }
+
+  @Override
+  public <E> CqResults<E> executeWithInitialResults() throws CqClosedException,
+      RegionNotFoundException, CqException {
+    throw new IllegalStateException("Execute cannot be called on a CQ on the 
server");
+  }
+
+  @Override
+  public void execute() throws CqClosedException, RegionNotFoundException,
+      CqException {
+    throw new IllegalStateException("Execute cannot be called on a CQ on the 
server");
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java
new file mode 100644
index 0000000..85e886b
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/BaseCQCommand.java
@@ -0,0 +1,43 @@
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.operations.QueryOperationContext;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.cache.query.QueryInvalidException;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.internal.CqEntry;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache.query.internal.types.CollectionTypeImpl;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.cache.query.types.CollectionType;
+import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
+import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommandQuery;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ObjectPartList;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
+
+public abstract class BaseCQCommand extends BaseCommandQuery {
+
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java
new file mode 100644
index 0000000..4f8b2ef
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseCQ.java
@@ -0,0 +1,125 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.*;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import 
com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class CloseCQ extends BaseCQCommand {
+
+  private final static CloseCQ singleton = new CloseCQ();
+
+  public static Command getCommand() {
+    return singleton;
+  }
+
+  private CloseCQ() {
+  }
+
+  @Override
+  public void cmdExecute(Message msg, ServerConnection servConn, long start)
+      throws IOException {
+    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+    ClientProxyMembershipID id = servConn.getProxyID();
+    CacheServerStats stats = servConn.getCacheServerStats();
+
+    // Based on MessageType.QUERY
+    // Added by Rao 2/1/2007
+    servConn.setAsTrue(REQUIRES_RESPONSE);
+    servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+
+    start = DistributionStats.getStatTime();
+    // Retrieve the data from the message parts
+    String cqName = msg.getPart(0).getString();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Received close CQ request from {} cqName: {}", 
servConn.getName(), servConn.getSocketString(), cqName);
+    }
+
+    // Process the query request
+    if (cqName == null) {
+      String err = 
LocalizedStrings.CloseCQ_THE_CQNAME_FOR_THE_CQ_CLOSE_REQUEST_IS_NULL.toLocalizedString();
+      sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg
+          .getTransactionId(), null, servConn);
+      return;
+    }
+
+    // Process CQ close request
+    try {
+      // Append Client ID to CQ name
+      CqService cqService = crHelper.getCache().getCqService();
+      cqService.start();
+      
+      String serverCqName = cqName;
+      if (id != null) {
+        serverCqName = cqService.constructServerCqName(cqName, id);
+      }
+      InternalCqQuery cqQuery = cqService.getCq(serverCqName);
+      
+      AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+      if (authzRequest != null) {
+        String queryStr = null;
+        Set cqRegionNames = null;        
+        
+        if (cqQuery != null) {
+          queryStr = cqQuery.getQueryString();
+          cqRegionNames = new HashSet();
+          cqRegionNames.add(((InternalCqQuery)cqQuery).getRegionName());
+          authzRequest.closeCQAuthorize(cqName, queryStr, cqRegionNames);
+        }
+        
+      }
+      // String cqNameWithClientId = new String(cqName + "__" +
+      // getMembershipID());
+      cqService.closeCq(cqName, id);
+      if(cqQuery != null)
+        servConn.removeCq(cqName, cqQuery.isDurable());
+    }
+    catch (CqException cqe) {
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(),
+          cqe, servConn);
+      return;
+    }
+    catch (Exception e) {
+      String err = 
LocalizedStrings.CloseCQ_EXCEPTION_WHILE_CLOSING_CQ_CQNAME_0.toLocalizedString(cqName);
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err,
+          msg.getTransactionId(), e, servConn);
+      return;
+    }
+
+    // Send OK to client
+    sendCqResponse(MessageType.REPLY, 
LocalizedStrings.CloseCQ_CQ_CLOSED_SUCCESSFULLY.toLocalizedString(), 
msg.getTransactionId(), null, servConn);
+    servConn.setAsTrue(RESPONDED);
+
+    {
+      long oldStart = start;
+      start = DistributionStats.getStatTime();
+      stats.incProcessCloseCqTime(start - oldStart);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java
new file mode 100644
index 0000000..9f111fc
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ.java
@@ -0,0 +1,162 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.*;
+import com.gemstone.gemfire.cache.operations.ExecuteCQOperationContext;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ;
+import 
com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+
+public class ExecuteCQ extends BaseCQCommand {
+  protected static final Logger logger = LogService.getLogger();
+
+  private final static ExecuteCQ singleton = new ExecuteCQ();
+
+  public static Command getCommand() {
+    return singleton;
+  }
+
+  private ExecuteCQ() {
+  }
+
+  @Override
+  public void cmdExecute(Message msg, ServerConnection servConn, long start)
+      throws IOException, InterruptedException {
+    AcceptorImpl acceptor = servConn.getAcceptor();
+    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+    ClientProxyMembershipID id = servConn.getProxyID();
+    CacheServerStats stats = servConn.getCacheServerStats();
+
+    servConn.setAsTrue(REQUIRES_RESPONSE);
+    servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+    
+    // Retrieve the data from the message parts
+    String cqName = msg.getPart(0).getString();
+    String cqQueryString = msg.getPart(1).getString();
+    int cqState = msg.getPart(2).getInt();
+
+    Part isDurablePart = msg.getPart(3);
+    byte[] isDurableByte = isDurablePart.getSerializedForm();
+    boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? 
false
+        : true;
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Received {} request from {} CqName: {} queryString: 
{}", servConn.getName(), MessageType.getString(msg.getMessageType()), 
servConn.getSocketString(), cqName, cqQueryString);
+    }
+
+    DefaultQueryService qService = null;
+    CqService cqServiceForExec = null;
+    Query query = null;
+    Set cqRegionNames = null;
+    ExecuteCQOperationContext executeCQContext = null;
+    ServerCQ cqQuery = null;
+    
+    try {
+      qService = 
(DefaultQueryService)((GemFireCacheImpl)crHelper.getCache()).getLocalQueryService();
+
+      // Authorization check
+      AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+      if (authzRequest != null) {
+        query = qService.newQuery(cqQueryString);
+        cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null);
+        executeCQContext = authzRequest.executeCQAuthorize(cqName,
+            cqQueryString, cqRegionNames);
+        String newCqQueryString = executeCQContext.getQuery();
+        
+        if (!cqQueryString.equals(newCqQueryString)) {
+          query = qService.newQuery(newCqQueryString);
+          cqQueryString = newCqQueryString;
+          cqRegionNames = executeCQContext.getRegionNames();
+          if (cqRegionNames == null) {
+            cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null);
+          }
+        }
+      }
+
+      cqServiceForExec = qService.getCqService();
+      cqQuery = cqServiceForExec.executeCq(cqName, cqQueryString, 
+        cqState, id, acceptor.getCacheClientNotifier(), isDurable, false, 0, 
null);
+    }
+    catch (CqException cqe) {
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(),
+          cqe, servConn);
+      return;
+    }
+    catch (Exception e) {
+      writeChunkedException(msg, e, false, servConn);
+      return;
+    }
+
+    long oldstart = start;
+    boolean sendResults = false;
+    boolean successQuery = false;
+    
+    if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) {
+      sendResults = true;
+    }
+
+    // Execute the query and send the result-set to client.    
+    try {
+      if (query == null) {
+        query = qService.newQuery(cqQueryString);
+        cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null);
+      }
+      ((DefaultQuery)query).setIsCqQuery(true);
+      successQuery = processQuery(msg, query, cqQueryString,
+          cqRegionNames, start, cqQuery, executeCQContext, servConn, 
sendResults);
+
+      // Update the CQ statistics.
+      
cqQuery.getVsdStats().setCqInitialResultsTime((DistributionStats.getStatTime()) 
- oldstart);
+      stats.incProcessExecuteCqWithIRTime((DistributionStats.getStatTime()) - 
oldstart);
+      //logger.fine("Time spent in execute with initial results :" + 
DistributionStats.getStatTime() + ", " +  oldstart);
+    } finally { // To handle any exception.
+      // If failure to execute the query, close the CQ.
+      if (!successQuery) {
+        try {
+          cqServiceForExec.closeCq(cqName, id);
+        }
+        catch (Exception ex) {
+          // Ignore.
+        }        
+      }
+    }
+
+    if (!sendResults && successQuery) {
+      // Send OK to client
+      sendCqResponse(MessageType.REPLY, 
LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(), msg
+          .getTransactionId(), null, servConn);
+
+      long start2 = DistributionStats.getStatTime();
+      stats.incProcessCreateCqTime(start2 - oldstart);
+    }
+    servConn.setAsTrue(RESPONDED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java
new file mode 100755
index 0000000..8e19ef3
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteCQ61.java
@@ -0,0 +1,214 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.operations.ExecuteCQOperationContext;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
+import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats;
+import 
com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.internal.cache.vmotion.VMotionObserver;
+import com.gemstone.gemfire.internal.cache.vmotion.VMotionObserverHolder;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+
+
+/**
+ * @author aingle
+ * @since 6.1
+ */
+
+public class ExecuteCQ61 extends BaseCQCommand {
+  protected static final Logger logger = LogService.getLogger();
+
+  private final static ExecuteCQ61 singleton = new ExecuteCQ61();
+  
+  public static Command getCommand() {
+    return singleton;
+  }
+
+  private ExecuteCQ61() {
+  }
+
+  @Override
+  public void cmdExecute(Message msg, ServerConnection servConn, long start)
+      throws IOException, InterruptedException {
+    AcceptorImpl acceptor = servConn.getAcceptor();
+    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+    ClientProxyMembershipID id = servConn.getProxyID();
+    CacheServerStats stats = servConn.getCacheServerStats();
+
+    servConn.setAsTrue(REQUIRES_RESPONSE);
+    servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+    
+    // Retrieve the data from the message parts
+    String cqName = msg.getPart(0).getString();
+    String cqQueryString = msg.getPart(1).getString();
+    int cqState = msg.getPart(2).getInt();
+
+    Part isDurablePart = msg.getPart(3);
+    byte[] isDurableByte = isDurablePart.getSerializedForm();
+    boolean isDurable = (isDurableByte == null || isDurableByte[0] == 0) ? 
false
+        : true;
+//  region data policy
+    Part regionDataPolicyPart = msg.getPart(msg.getNumberOfParts()-1);
+    byte[] regionDataPolicyPartBytes = 
regionDataPolicyPart.getSerializedForm();
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Received {} request from {} CqName: {} queryString: 
{}", servConn.getName(), MessageType.getString(msg.getMessageType()), 
servConn.getSocketString(), cqName, cqQueryString);
+    }
+
+    // Check if the Server is running in NotifyBySubscription=true mode.
+    CacheClientNotifier ccn = acceptor.getCacheClientNotifier();
+    if (ccn != null) {
+      CacheClientProxy proxy = ccn.getClientProxy(id);
+      if (proxy != null && !proxy.isNotifyBySubscription()) {
+        // This should have been taken care at the client.
+        String err = 
LocalizedStrings.ExecuteCQ_SERVER_NOTIFYBYSUBSCRIPTION_MODE_IS_SET_TO_FALSE_CQ_EXECUTION_IS_NOT_SUPPORTED_IN_THIS_MODE.toLocalizedString();
+        sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg
+          .getTransactionId(), null, servConn);
+        return;
+      }
+    }
+
+    DefaultQueryService qService = null;
+    CqServiceImpl cqServiceForExec = null;
+    Query query = null;
+    Set cqRegionNames = null;
+    ExecuteCQOperationContext executeCQContext = null;
+    ServerCQImpl cqQuery = null;
+    
+    try {
+      qService = 
(DefaultQueryService)((GemFireCacheImpl)crHelper.getCache()).getLocalQueryService();
+
+      // Authorization check
+      AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+      if (authzRequest != null) {
+        query = qService.newQuery(cqQueryString);
+        cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null);
+        executeCQContext = authzRequest.executeCQAuthorize(cqName,
+            cqQueryString, cqRegionNames);
+        String newCqQueryString = executeCQContext.getQuery();
+        
+        if (!cqQueryString.equals(newCqQueryString)) {
+          query = qService.newQuery(newCqQueryString);
+          cqQueryString = newCqQueryString;
+          cqRegionNames = executeCQContext.getRegionNames();
+          if (cqRegionNames == null) {
+            cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null);
+          }
+        }
+      }
+      
+      // test hook to trigger vMotion during CQ registration
+
+      if (CqServiceProvider.VMOTION_DURING_CQ_REGISTRATION_FLAG) {
+        VMotionObserver vmo = VMotionObserverHolder.getInstance();
+        vmo.vMotionBeforeCQRegistration();
+      }
+
+      cqServiceForExec = (CqServiceImpl) qService.getCqService();
+      //registering cq with serverConnection so that when CCP will require 
auth info it can access that
+      //registering cq auth before as possibility that you may get event
+      servConn.setCq(cqName, isDurable);
+      cqQuery = (ServerCQImpl) cqServiceForExec.executeCq(cqName, 
cqQueryString, cqState, 
+        id, ccn, isDurable, true, regionDataPolicyPartBytes[0], null);  
+    }
+    catch (CqException cqe) {
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(),
+          cqe, servConn);
+      servConn.removeCq(cqName, isDurable);
+      return;
+    }
+    catch (Exception e) {
+      writeChunkedException(msg, e, false, servConn);
+      servConn.removeCq(cqName, isDurable);
+      return;
+    }
+
+    long oldstart = start;
+    boolean sendResults = false;
+    boolean successQuery = false;
+
+    if (msg.getMessageType() == MessageType.EXECUTECQ_WITH_IR_MSG_TYPE) {
+      sendResults = true;
+    }
+    
+    // Execute the query only if it is execute with initial results or
+    // if it is a non PR query with execute query and maintain keys flags set
+    if(sendResults || (CqServiceImpl.EXECUTE_QUERY_DURING_INIT && 
CqServiceProvider.MAINTAIN_KEYS && !cqQuery.isPR())) {
+      // Execute the query and send the result-set to client.
+      try {
+        if (query == null) {
+          query = qService.newQuery(cqQueryString);
+          cqRegionNames = ((DefaultQuery)query).getRegionsInQuery(null);
+        }
+        ((DefaultQuery)query).setIsCqQuery(true);
+        successQuery = processQuery(msg, query, cqQueryString,
+            cqRegionNames, start, cqQuery, executeCQContext, servConn, 
sendResults);
+      
+
+        // Update the CQ statistics.
+        
cqQuery.getVsdStats().setCqInitialResultsTime((DistributionStats.getStatTime()) 
- oldstart);
+        stats.incProcessExecuteCqWithIRTime((DistributionStats.getStatTime()) 
- oldstart);
+        //logger.fine("Time spent in execute with initial results :" + 
DistributionStats.getStatTime() + ", " +  oldstart);
+      } finally { // To handle any exception.
+        // If failure to execute the query, close the CQ.
+        if (!successQuery) {
+          try {
+            cqServiceForExec.closeCq(cqName, id);
+          }
+          catch (Exception ex) {
+            // Ignore.
+          }      
+        }
+      }
+    } else {
+      // Don't execute query for cq.execute and
+      // if it is a PR query with execute query and maintain keys flags not set
+      cqQuery.cqResultKeysInitialized = true;
+      successQuery = true;
+    }
+      
+    if (!sendResults && successQuery) {
+      // Send OK to client
+      sendCqResponse(MessageType.REPLY, 
LocalizedStrings.ExecuteCQ_CQ_CREATED_SUCCESSFULLY.toLocalizedString(), msg
+          .getTransactionId(), null, servConn);
+
+      long start2 = DistributionStats.getStatTime();
+      stats.incProcessCreateCqTime(start2 - oldstart);
+    }
+    servConn.setAsTrue(RESPONDED);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java
new file mode 100644
index 0000000..b483ddf
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetCQStats.java
@@ -0,0 +1,94 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.*;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+
+import java.io.IOException;
+
+
+public class GetCQStats extends BaseCQCommand {
+
+  private final static GetCQStats singleton = new GetCQStats();
+
+  public static Command getCommand() {
+    return singleton;
+  }
+
+  private GetCQStats() {
+  }
+
+  @Override
+  public void cmdExecute(Message msg, ServerConnection servConn, long start)
+      throws IOException {
+    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+
+    CacheServerStats stats = servConn.getCacheServerStats();
+
+    servConn.setAsTrue(REQUIRES_RESPONSE);
+    servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("{}: Received close all client CQs request from {}", 
servConn.getName(), servConn.getSocketString());
+    }
+
+    // Retrieve the data from the message parts
+    String cqName = msg.getPart(0).getString();
+
+    if (isDebugEnabled) {
+      logger.debug("{}: Received close CQ request from {} cqName: {}", 
servConn.getName(), servConn.getSocketString(), cqName);
+    }
+
+    // Process the query request
+    if (cqName == null) {
+      String err = "The cqName for the cq stats request is null";
+      sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg
+          .getTransactionId(), null, servConn);
+      return;
+
+    }
+    else {
+      // Process the cq request
+      try {
+        // make sure the cqservice has been created
+        // since that is what registers the stats
+        CqService cqService = crHelper.getCache().getCqService();
+        cqService.start();
+      }
+      catch (Exception e) {
+        String err = "Exception while Getting the CQ Statistics. ";
+        sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err, msg
+            .getTransactionId(), e, servConn);
+        return;
+      }
+    }
+    // Send OK to client
+    sendCqResponse(MessageType.REPLY, "cq stats sent successfully.", msg
+        .getTransactionId(), null, servConn);
+    servConn.setAsTrue(RESPONDED);
+
+    {
+      long oldStart = start;
+      start = DistributionStats.getStatTime();
+      stats.incProcessGetCqStatsTime(start - oldStart);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java
new file mode 100755
index 0000000..f22d141
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/GetDurableCQs.java
@@ -0,0 +1,137 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.operations.GetDurableCQsOperationContext;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import 
com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+
+
+public class GetDurableCQs extends BaseCQCommand {
+
+  private final static GetDurableCQs singleton = new GetDurableCQs();
+
+  public static Command getCommand() {
+    return singleton;
+  }
+
+  private GetDurableCQs() {
+  }
+
+  @Override
+  public void cmdExecute(Message msg, ServerConnection servConn, long start)
+      throws IOException, InterruptedException {
+    AcceptorImpl acceptor = servConn.getAcceptor();
+    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+    ClientProxyMembershipID id = servConn.getProxyID();
+    CacheServerStats stats = servConn.getCacheServerStats();
+
+    servConn.setAsTrue(REQUIRES_RESPONSE);
+    servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Received {} request from {}", servConn.getName(), 
MessageType.getString(msg.getMessageType()), servConn.getSocketString());
+    }
+
+    DefaultQueryService qService = null;
+    CqService cqServiceForExec = null;
+    Query query = null;
+    Set cqRegionNames = null;
+    GetDurableCQsOperationContext getDurableCqsOperationContext = null;
+    InternalCqQuery cqQuery = null;
+
+    try {
+      qService = (DefaultQueryService) ((GemFireCacheImpl) crHelper.getCache())
+          .getLocalQueryService();
+
+      // Authorization check
+      AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+      if (authzRequest != null) {
+        authzRequest.getDurableCQsAuthorize();
+      }
+
+      cqServiceForExec = qService.getCqService();
+      List<String> durableCqs = cqServiceForExec.getAllDurableClientCqs(id);
+
+      ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+      chunkedResponseMsg.setMessageType(MessageType.RESPONSE);
+      chunkedResponseMsg.setTransactionId(msg.getTransactionId());
+      chunkedResponseMsg.sendHeader();
+
+      List durableCqList = new ArrayList(maximumChunkSize);
+      final boolean isTraceEnabled = logger.isTraceEnabled();
+      for (Iterator it = durableCqs.iterator(); it.hasNext();) {
+        Object durableCqName = it.next();
+        durableCqList.add(durableCqName);
+        if (isTraceEnabled) {
+          logger.trace("{}: getDurableCqsResponse <{}>; list size was {}", 
servConn.getName(), durableCqName, durableCqList.size());
+        }
+        if (durableCqList.size() == maximumChunkSize) {
+          // Send the chunk and clear the list
+          sendDurableCqsResponseChunk(durableCqList, false, servConn);
+          durableCqList.clear();
+        }
+      }
+      // Send the last chunk even if the list is of zero size.
+      sendDurableCqsResponseChunk(durableCqList, true, servConn);
+
+    } catch (CqException cqe) {
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(),
+          cqe, servConn);
+      return;
+    } catch (Exception e) {
+      writeChunkedException(msg, e, false, servConn);
+      return;
+    }
+  }
+
+  private void sendDurableCqsResponseChunk(List list, boolean lastChunk,
+      ServerConnection servConn) throws IOException {
+    ChunkedMessage chunkedResponseMsg = servConn.getChunkedResponseMessage();
+
+    chunkedResponseMsg.setNumberOfParts(1);
+    chunkedResponseMsg.setLastChunk(lastChunk);
+    chunkedResponseMsg.addObjPart(list, zipValues);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Sending {} durableCQs response chunk{}", 
servConn.getName(), (lastChunk ? " last " : " "), (logger.isTraceEnabled() ? " 
keys=" + list + " chunk=<" + chunkedResponseMsg + ">" : ""));
+    }
+
+    chunkedResponseMsg.sendChunk(servConn);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java
new file mode 100644
index 0000000..bd77cee
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/MonitorCQ.java
@@ -0,0 +1,94 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.*;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+
+import java.io.IOException;
+
+public class MonitorCQ extends BaseCQCommand {
+
+  private final static MonitorCQ singleton = new MonitorCQ();
+
+  public static Command getCommand() {
+    return singleton;
+  }
+
+  private MonitorCQ() {
+  }
+
+  @Override
+  public void cmdExecute(Message msg, ServerConnection servConn, long start)
+      throws IOException {
+    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+    servConn.setAsTrue(REQUIRES_RESPONSE);
+    servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+
+    int op = msg.getPart(0).getInt();
+
+    if (op < 1) {
+      // This should have been taken care at the client - remove?
+      String err = 
LocalizedStrings.MonitorCQ__0_THE_MONITORCQ_OPERATION_IS_INVALID.toLocalizedString(servConn.getName());
+      sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg
+          .getTransactionId(), null, servConn);
+      return;
+    }
+
+    String regionName = null;
+    if (msg.getNumberOfParts() == 2) {
+      // This will be enable/disable on region.
+      regionName = msg.getPart(1).getString();
+      if (regionName == null) {
+        // This should have been taken care at the client - remove?
+        String err = 
LocalizedStrings.MonitorCQ__0_A_NULL_REGION_NAME_WAS_PASSED_FOR_MONITORCQ_OPERATION.toLocalizedString(servConn.getName());
+        sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg
+            .getTransactionId(), null, servConn);
+        return;
+      }
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Received MonitorCq request from {} op: {}{}", 
servConn.getName(), servConn.getSocketString(), op, (regionName != null) ? " 
RegionName: " + regionName : "");
+    }
+
+    try {
+      CqService cqService = crHelper.getCache().getCqService();
+      cqService.start();
+      // The implementation of enable/disable cq is changed.
+      // Instead calling enable/disable client calls execute/stop methods
+      // at cache and region level.
+      // This method is retained for future purpose, to support admin level 
apis
+      // similar to enable/disable at system/client level.
+      // Should never come.
+      throw new CqException 
(LocalizedStrings.CqService_INVALID_CQ_MONITOR_REQUEST_RECEIVED.toLocalizedString());
+    }
+    catch (CqException cqe) {
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(),
+          cqe, servConn);
+      return;
+    }
+    catch (Exception e) {
+      String err = 
LocalizedStrings.MonitorCQ_EXCEPTION_WHILE_HANDLING_THE_MONITOR_REQUEST_OP_IS_0.toLocalizedString(Integer.valueOf(op));
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err,
+          msg.getTransactionId(), e, servConn);
+      return;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java
 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java
new file mode 100644
index 0000000..157158e
--- /dev/null
+++ 
b/gemfire-cq/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/StopCQ.java
@@ -0,0 +1,129 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
+import com.gemstone.gemfire.internal.cache.tier.Command;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.*;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import 
com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.security.AuthorizeRequest;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class StopCQ extends BaseCQCommand {
+
+  private final static StopCQ singleton = new StopCQ();
+
+  public static Command getCommand() {
+    return singleton;
+  }
+
+  private StopCQ() {
+  }
+
+  @Override
+  public void cmdExecute(Message msg, ServerConnection servConn, long start)
+      throws IOException {
+    CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
+    ClientProxyMembershipID id = servConn.getProxyID();
+    CacheServerStats stats = servConn.getCacheServerStats();
+
+    // Based on MessageType.QUERY
+    // Added by Rao 2/1/2007
+    servConn.setAsTrue(REQUIRES_RESPONSE);
+    servConn.setAsTrue(REQUIRES_CHUNKED_RESPONSE);
+
+    start = DistributionStats.getStatTime();
+    // Retrieve the data from the message parts
+    String cqName = msg.getPart(0).getString();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: Received stop CQ request from {} cqName: {}", 
servConn.getName(), servConn.getSocketString(), cqName);
+    }
+
+    // Process the query request
+    if (cqName == null) {
+      String err = 
LocalizedStrings.StopCQ_THE_CQNAME_FOR_THE_CQ_STOP_REQUEST_IS_NULL.toLocalizedString();
+      sendCqResponse(MessageType.CQDATAERROR_MSG_TYPE, err, msg
+          .getTransactionId(), null, servConn);
+      return;
+    }
+
+    // Process CQ stop request
+    try {
+      // Append Client ID to CQ name
+      CqService cqService = crHelper.getCache().getCqService();
+      cqService.start();
+      // String cqNameWithClientId = new String(cqName + "__" +
+      // getMembershipID());
+      String serverCqName = cqName;
+      if (id != null) {
+        serverCqName = cqService.constructServerCqName(cqName, id);
+      }
+      InternalCqQuery cqQuery = cqService.getCq(serverCqName);
+      
+      AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+      if (authzRequest != null) {
+        String queryStr = null;
+        Set cqRegionNames = null;
+        
+        if (cqQuery != null) {
+          queryStr = cqQuery.getQueryString();
+          cqRegionNames = new HashSet();
+          cqRegionNames.add(((CqQueryImpl)cqQuery).getRegionName());
+        }
+        authzRequest.stopCQAuthorize(cqName, queryStr, cqRegionNames);
+      }
+      cqService.stopCq(cqName, id);
+      if(cqQuery != null)
+        servConn.removeCq(cqName, cqQuery.isDurable());
+    }
+    catch (CqException cqe) {
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, "", msg.getTransactionId(),
+          cqe, servConn);
+      return;
+    }
+    catch (Exception e) {
+      String err = LocalizedStrings.StopCQ_EXCEPTION_WHILE_STOPPING_CQ_NAMED_0
+        .toLocalizedString(cqName);
+      sendCqResponse(MessageType.CQ_EXCEPTION_TYPE, err,
+          msg.getTransactionId(), e, servConn);
+      return;
+    }
+
+    // Send OK to client
+    sendCqResponse(MessageType.REPLY, 
LocalizedStrings.StopCQ_CQ_STOPPED_SUCCESSFULLY.toLocalizedString(), msg
+        .getTransactionId(), null, servConn);
+
+    servConn.setAsTrue(RESPONDED);
+
+    {
+      long oldStart = start;
+      start = DistributionStats.getStatTime();
+      stats.incProcessStopCqTime(start - oldStart);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory
 
b/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory
new file mode 100644
index 0000000..0c51c3d
--- /dev/null
+++ 
b/gemfire-cq/src/main/resources/META-INF/services/com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory
@@ -0,0 +1 @@
+com.gemstone.gemfire.cache.query.internal.cq.CqServiceFactoryImpl

Reply via email to