This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ffbc41  GEODE-6488: Migrating cancellation state to execution context 
(#3322)
3ffbc41 is described below

commit 3ffbc4183de59aa0603bea2e25f91a6e80a1d079
Author: Ryan McMahon <rmcma...@pivotal.io>
AuthorDate: Wed Mar 20 08:38:04 2019 -0700

    GEODE-6488: Migrating cancellation state to execution context (#3322)
    
    This work solves two problems.  One is that the query cancellation task
    reference in DefaultQuery could be overwritten and thus never removed
    from monitoring upon successful completion of a query.  Second is that
    once a query execution timed out once, the query object was in an
    unusable state which is undesirable.
    
    The solution is to attach the cancellation state to the execution
    context rather than the query object, so that cancellation is associated
    with each independent execution of a query rather than having
    cancellation state that applies to the entire query object.
---
 .../dunit/QueryUsingFunctionContextDUnitTest.java  | 11 ++-
 .../ResourceManagerWithQueryMonitorDUnitTest.java  | 10 ++-
 .../cache/query/partitioned/PRQueryDUnitTest.java  | 14 +++-
 .../internal/cache/PRQueryDistributedTest.java     |  4 +-
 .../execute/LocalDataSetIndexingDUnitTest.java     |  7 +-
 .../apache/geode/cache/query/QueryJUnitTest.java   |  4 +-
 .../cache/query/QueryServiceRegressionTest.java    | 11 ++-
 .../QueryWithBucketParameterIntegrationTest.java   | 16 +++-
 .../internal/QueryMonitorIntegrationTest.java      | 73 ++++++++++++++---
 .../internal/index/CompactRangeIndexJUnitTest.java |  7 +-
 ...itionedRegionQueryEvaluatorIntegrationTest.java | 12 +--
 .../codeAnalysis/sanctionedDataSerializables.txt   |  2 +-
 .../MonitorQueryUnderContentionBenchmark.java      | 10 +--
 .../geode/cache/query/internal/DefaultQuery.java   | 95 +++++-----------------
 .../cache/query/internal/ExecutionContext.java     | 54 +++++++++++-
 .../query/internal/QueryExecutionContext.java      |  6 +-
 .../geode/cache/query/internal/QueryExecutor.java  |  5 +-
 .../geode/cache/query/internal/QueryMonitor.java   | 71 ++++++++--------
 .../cache/query/internal/index/HashIndex.java      |  2 +-
 .../query/internal/index/MemoryIndexStore.java     | 27 +++---
 .../internal/streaming/StreamingOperation.java     |  4 +-
 .../apache/geode/internal/cache/LocalDataSet.java  | 11 ++-
 .../geode/internal/cache/PRQueryProcessor.java     |  5 +-
 .../geode/internal/cache/PartitionedRegion.java    | 14 +++-
 .../cache/PartitionedRegionQueryEvaluator.java     | 45 +++++-----
 .../internal/cache/partitioned/QueryMessage.java   | 12 ++-
 .../cache/tier/sockets/BaseCommandQuery.java       | 40 ++++++---
 .../internal/beans/QueryDataFunction.java          |  5 +-
 .../internal/CompiledIteratorDefJUnitTest.java     |  2 +-
 .../cache/query/internal/QueryMonitorTest.java     | 22 ++---
 .../cache/PartitionedRegionQueryEvaluatorTest.java | 30 ++++---
 .../cache/query/dunit/QueryMonitorDUnitTest.java   | 90 ++++++++++++++++----
 .../cache/query/cq/internal/command/ExecuteCQ.java |  5 +-
 .../query/cq/internal/command/ExecuteCQ61.java     |  5 +-
 .../geode/test/junit/rules/LocatorStarterRule.java |  2 +-
 .../geode/test/junit/rules/ServerStarterRule.java  |  2 +-
 36 files changed, 467 insertions(+), 268 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
index b540c44..53b218d 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
@@ -52,10 +52,13 @@ import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.functional.StructSetOrResultsSet;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.IndexTrackingQueryObserver;
+import org.apache.geode.cache.query.internal.QueryExecutionContext;
 import org.apache.geode.cache.query.internal.QueryObserverHolder;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalDataSet;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.execute.PRClientServerTestBase;
@@ -673,9 +676,11 @@ public class QueryUsingFunctionContextDUnitTest extends 
JUnit4CacheTestCase {
 
       try {
         Query query = queryService.newQuery((String) args[0]);
+        final ExecutionContext executionContext =
+            new QueryExecutionContext(null, (InternalCache) cache, query);
         context.getResultSender()
             .lastResult((ArrayList) ((SelectResults) ((LocalDataSet) 
localDataSet)
-                .executeQuery((DefaultQuery) query, null, buckets)).asList());
+                .executeQuery((DefaultQuery) query, executionContext, null, 
buckets)).asList());
       } catch (Exception e) {
         throw new FunctionException(e);
       }
@@ -866,9 +871,13 @@ public class QueryUsingFunctionContextDUnitTest extends 
JUnit4CacheTestCase {
     QueryService qservice = CacheFactory.getAnyInstance().getQueryService();
 
     Query query = qservice.newQuery(queryStr);
+    final ExecutionContext executionContext =
+        new QueryExecutionContext(null, (InternalCache) cache, query);
+
     SelectResults results;
     try {
       results = (SelectResults) ((LocalDataSet) 
localDataSet).executeQuery((DefaultQuery) query,
+          executionContext,
           null, buckets);
 
       return (ArrayList) results.asList();
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
index 7076acf..7faf25f 100755
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
@@ -57,6 +57,7 @@ import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.TypeMismatchException;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.cache30.ClientServerTestCase;
@@ -1296,7 +1297,8 @@ public class ResourceManagerWithQueryMonitorDUnitTest 
extends ClientServerTestCa
     public boolean rejectedObjects = false;
 
     @Override
-    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
+    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
+        final ExecutionContext executionContext) {
       switch (spot) {
         case BEFORE_QUERY_EXECUTION:
           try {
@@ -1327,7 +1329,8 @@ public class ResourceManagerWithQueryMonitorDUnitTest 
extends ClientServerTestCa
     private int numObjectsBeforeCancel = 5;
 
     @Override
-    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
+    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
+        final ExecutionContext executionContext) {
       switch (spot) {
         case LOW_MEMORY_WHEN_DESERIALIZING_STREAMINGOPERATION:
           rejectedObjects = true;
@@ -1350,7 +1353,8 @@ public class ResourceManagerWithQueryMonitorDUnitTest 
extends ClientServerTestCa
     public boolean rejectedObjects = false;
 
     @Override
-    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
+    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
+        final ExecutionContext executionContext) {
       switch (spot) {
         case BEFORE_BUILD_CUMULATIVE_RESULT:
           if (triggeredOOME == false) {
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitTest.java
index 55772ba..63fd597 100755
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitTest.java
@@ -42,6 +42,8 @@ import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.QueryExecutionContext;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionQueryEvaluator;
 import org.apache.geode.test.dunit.VM;
@@ -113,6 +115,7 @@ public class PRQueryDUnitTest extends CacheTestCase {
 
     DefaultQuery query = (DefaultQuery) getCache().getQueryService()
         .newQuery("select distinct * from " + region.getFullPath());
+    final ExecutionContext executionContext = new QueryExecutionContext(null, 
getCache(), query);
     SelectResults results =
         query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, 
getCache(), query);
 
@@ -124,6 +127,7 @@ public class PRQueryDUnitTest extends CacheTestCase {
     PartitionedRegion partitionedRegion = (PartitionedRegion) region;
     PartitionedRegionQueryEvaluator queryEvaluator =
         new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), 
partitionedRegion, query,
+            executionContext,
             EMPTY_PARAMETERS, results, buckets);
 
     DisconnectingTestHook testHook = new DisconnectingTestHook();
@@ -164,13 +168,15 @@ public class PRQueryDUnitTest extends CacheTestCase {
 
       for (int i = 0; i < queries.length; i++) {
         DefaultQuery query = (DefaultQuery) 
getCache().getQueryService().newQuery(queries[i]);
+        final ExecutionContext executionContext =
+            new QueryExecutionContext(null, getCache(), query);
         SelectResults results =
             query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, 
getCache(), query);
 
         PartitionedRegion partitionedRegion = (PartitionedRegion) region;
         PartitionedRegionQueryEvaluator queryEvaluator =
             new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), 
partitionedRegion,
-                query, EMPTY_PARAMETERS, results, bucketsToQuery);
+                query, executionContext, EMPTY_PARAMETERS, results, 
bucketsToQuery);
 
         CollatingTestHook testHook = new CollatingTestHook(queryEvaluator);
         queryEvaluator.queryBuckets(testHook);
@@ -212,13 +218,14 @@ public class PRQueryDUnitTest extends CacheTestCase {
 
     for (int i = 0; i < queries.length; i++) {
       DefaultQuery query = (DefaultQuery) 
getCache().getQueryService().newQuery(queries[i]);
+      final ExecutionContext executionContext = new 
QueryExecutionContext(null, getCache(), query);
       SelectResults results =
           query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, 
getCache(), query);
 
       PartitionedRegion partitionedRegion = (PartitionedRegion) region;
       PartitionedRegionQueryEvaluator queryEvaluator =
           new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), 
partitionedRegion,
-              query, EMPTY_PARAMETERS, results, buckets);
+              query, executionContext, EMPTY_PARAMETERS, results, buckets);
 
       CollatingTestHook testHook = new CollatingTestHook(queryEvaluator);
       queryEvaluator.queryBuckets(testHook);
@@ -258,6 +265,7 @@ public class PRQueryDUnitTest extends CacheTestCase {
 
       DefaultQuery query = (DefaultQuery) getCache().getQueryService()
           .newQuery("select distinct * from /" + regionName);
+      final ExecutionContext executionContext = new 
QueryExecutionContext(null, getCache(), query);
       SelectResults results =
           query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, 
getCache(), query);
 
@@ -270,7 +278,7 @@ public class PRQueryDUnitTest extends CacheTestCase {
       PartitionedRegion partitionedRegion = (PartitionedRegion) region;
       PartitionedRegionQueryEvaluator queryEvaluator =
           new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), 
partitionedRegion,
-              query, EMPTY_PARAMETERS, results, buckets);
+              query, executionContext, EMPTY_PARAMETERS, results, buckets);
 
       assertThatThrownBy(() -> queryEvaluator.queryBuckets(null))
           .isInstanceOf(QueryException.class);
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PRQueryDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PRQueryDistributedTest.java
index 482d65d..3e6eeba 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PRQueryDistributedTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PRQueryDistributedTest.java
@@ -42,6 +42,7 @@ import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.cache.query.internal.DefaultQuery.TestHook;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.index.IndexManager;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.CacheRule;
@@ -575,7 +576,8 @@ public class PRQueryDistributedTest implements Serializable 
{
     }
 
     @Override
-    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
+    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
+        final ExecutionContext executionContext) {
       hooks.put(spot, Boolean.TRUE);
     }
   }
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/LocalDataSetIndexingDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/LocalDataSetIndexingDUnitTest.java
index 604805c..199e78e 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/LocalDataSetIndexingDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/LocalDataSetIndexingDUnitTest.java
@@ -39,6 +39,8 @@ import org.apache.geode.cache.query.IndexType;
 import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.QueryExecutionContext;
 import org.apache.geode.cache.query.internal.QueryObserverAdapter;
 import org.apache.geode.cache.query.internal.QueryObserverHolder;
 import org.apache.geode.cache30.CacheSerializableRunnable;
@@ -149,6 +151,8 @@ public class LocalDataSetIndexingDUnitTest extends 
JUnit4CacheTestCase {
                   QueryService qs = pr1.getCache().getQueryService();
                   DefaultQuery query = (DefaultQuery) qs.newQuery(
                       "select distinct e1.value from /pr1 e1, /pr2  e2 where 
e1.value=e2.value");
+                  final ExecutionContext executionContext =
+                      new QueryExecutionContext(null, cache, query);
 
                   GemFireCacheImpl.getInstance().getLogger()
                       .fine(" Num BUCKET SET: " + localCust.getBucketSet());
@@ -175,7 +179,8 @@ public class LocalDataSetIndexingDUnitTest extends 
JUnit4CacheTestCase {
                   }
 
                   SelectResults r =
-                      (SelectResults) localCust.executeQuery(query, null, 
localCust.getBucketSet());
+                      (SelectResults) localCust.executeQuery(query, 
executionContext, null,
+                          localCust.getBucketSet());
 
                   GemFireCacheImpl.getInstance().getLogger().fine("Result :" + 
r.asList());
 
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryJUnitTest.java
index 09c9911..119a6fd 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryJUnitTest.java
@@ -52,6 +52,7 @@ import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.data.Position;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.index.IndexProtocol;
 import org.apache.geode.test.junit.categories.OQLQueryTest;
 
@@ -396,7 +397,8 @@ public class QueryJUnitTest {
     }
 
     @Override
-    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
+    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
+        final ExecutionContext executionContext) {
       if (spot == SPOTS.BEFORE_QUERY_EXECUTION) {
         try {
           barrier.await(8, TimeUnit.SECONDS);
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java
index 697d88b..447de42 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java
@@ -42,9 +42,12 @@ import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.data.Position;
 import org.apache.geode.cache.query.data.TestData.MyValue;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.QueryExecutionContext;
 import org.apache.geode.cache.query.internal.QueryObserverAdapter;
 import org.apache.geode.cache.query.internal.QueryObserverHolder;
 import org.apache.geode.cache.query.internal.QueryUtils;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalDataSet;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.test.junit.categories.OQLQueryTest;
@@ -442,7 +445,9 @@ public class QueryServiceRegressionTest {
     String query =
         "select distinct e1.value from /pr1 e1, " + "/pr2  e2" + " where 
e1.value=e2.value";
     DefaultQuery cury = (DefaultQuery) 
CacheUtils.getQueryService().newQuery(query);
-    SelectResults r = (SelectResults) lds.executeQuery(cury, null, set);
+    final ExecutionContext executionContext =
+        new QueryExecutionContext(null, (InternalCache) cache, cury);
+    SelectResults r = (SelectResults) lds.executeQuery(cury, executionContext, 
null, set);
 
     if (!observer.isIndexesUsed) {
       fail("Indexes should have been used");
@@ -513,7 +518,9 @@ public class QueryServiceRegressionTest {
     String query =
         "select distinct e1.key from /pr1.entries e1,/pr2.entries  e2" + " 
where e1.value=e2.value";
     DefaultQuery cury = (DefaultQuery) 
CacheUtils.getQueryService().newQuery(query);
-    SelectResults r = (SelectResults) lds.executeQuery(cury, null, set);
+    final ExecutionContext executionContext =
+        new QueryExecutionContext(null, (InternalCache) cache, cury);
+    SelectResults r = (SelectResults) lds.executeQuery(cury, executionContext, 
null, set);
 
     if (!observer.isIndexesUsed) {
       fail("Indexes should have been used");
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryWithBucketParameterIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryWithBucketParameterIntegrationTest.java
index f61d0e8..db14818 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryWithBucketParameterIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryWithBucketParameterIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.geode.cache.PartitionResolver;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.internal.cache.LocalDataSet;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.test.junit.categories.OQLQueryTest;
@@ -105,23 +106,28 @@ public class QueryWithBucketParameterIntegrationTest {
 
   @Test
   public void testQueryExecuteWithEmptyBucketListExpectNoResults() throws 
Exception {
-    SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, 
new HashSet<Integer>());
+    final ExecutionContext executionContext = new ExecutionContext(null, 
CacheUtils.getCache());
+    SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, 
executionContext, null,
+        new HashSet<Integer>());
     assertTrue("Received: A non-empty result collection, expected : Empty 
result collection",
         r.isEmpty());
   }
 
   @Test
   public void testQueryExecuteWithNullBucketListExpectNonEmptyResultSet() 
throws Exception {
-    SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, 
null);
+    final ExecutionContext executionContext = new ExecutionContext(null, 
CacheUtils.getCache());
+    SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, 
executionContext, null, null);
     assertFalse("Received: An empty result collection, expected : Non-empty 
result collection",
         r.isEmpty());
   }
 
   @Test
   public void testQueryExecuteWithNonEmptyBucketListExpectNonEmptyResultSet() 
throws Exception {
+    final ExecutionContext executionContext = new ExecutionContext(null, 
CacheUtils.getCache());
     int nTestBucketNumber = 15;
     Set<Integer> nonEmptySet = createAndPopulateSet(nTestBucketNumber);
-    SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, 
nonEmptySet);
+    SelectResults r =
+        (SelectResults) lds.executeQuery(queryExecutor, executionContext, 
null, nonEmptySet);
     assertFalse("Received: An empty result collection, expected : Non-empty 
result collection",
         r.isEmpty());
   }
@@ -129,8 +135,10 @@ public class QueryWithBucketParameterIntegrationTest {
   @Test(expected = QueryInvocationTargetException.class)
   public void 
testQueryExecuteWithLargerBucketListThanExistingExpectQueryInvocationTargetException()
       throws Exception {
+    final ExecutionContext executionContext = new ExecutionContext(null, 
CacheUtils.getCache());
     int nTestBucketNumber = 45;
     Set<Integer> overflowSet = createAndPopulateSet(nTestBucketNumber);
-    SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, 
overflowSet);
+    SelectResults r =
+        (SelectResults) lds.executeQuery(queryExecutor, executionContext, 
null, overflowSet);
   }
 }
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
index 41add0e..e9d19cb 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.cache.query.internal;
 
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
@@ -24,10 +26,13 @@ import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.stubbing.Answer;
 
+import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.CacheRuntimeException;
 import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
 import org.apache.geode.internal.cache.InternalCache;
@@ -40,21 +45,21 @@ import org.apache.geode.test.awaitility.GeodeAwaitility;
  */
 public class QueryMonitorIntegrationTest {
 
-  // query expiration duration so long that the query never expires
+  // executionContext expiration duration so long that the executionContext 
never expires
   private static final int NEVER_EXPIRE_MILLIS = 100000;
 
   // much much smaller than default maximum wait time of GeodeAwaitility
   private static final int EXPIRE_QUICK_MILLIS = 1;
 
   private InternalCache cache;
-  private DefaultQuery query;
+  private ExecutionContext executionContext;
   private volatile CacheRuntimeException cacheRuntimeException;
   private volatile QueryExecutionCanceledException 
queryExecutionCanceledException;
 
   @Before
   public void before() {
     cache = mock(InternalCache.class);
-    query = mock(DefaultQuery.class);
+    executionContext = mock(ExecutionContext.class);
     cacheRuntimeException = null;
     queryExecutionCanceledException = null;
   }
@@ -73,15 +78,15 @@ public class QueryMonitorIntegrationTest {
           cache,
           NEVER_EXPIRE_MILLIS);
 
-      queryMonitor.monitorQueryThread(query);
+      queryMonitor.monitorQueryExecution(executionContext);
 
       queryMonitor.setLowMemory(true, 1);
 
-      verify(query, times(1))
+      verify(executionContext, times(1))
           
.setQueryCanceledException(any(QueryExecutionLowMemoryException.class));
 
       
assertThatThrownBy(QueryMonitor::throwExceptionIfQueryOnCurrentThreadIsCanceled,
-          "Expected setLowMemory(true,_) to cancel query immediately, but it 
didn't.",
+          "Expected setLowMemory(true,_) to cancel executionContext 
immediately, but it didn't.",
           QueryExecutionCanceledException.class);
     } finally {
       if (queryMonitor != null) {
@@ -98,7 +103,7 @@ public class QueryMonitorIntegrationTest {
   }
 
   @Test
-  public void 
monitorQueryThreadCancelsLongRunningQueriesAndSetsExceptionAndThrowsException() 
{
+  public void 
monitorQueryExecutionCancelsLongRunningQueriesAndSetsExceptionAndThrowsException()
 {
 
     QueryMonitor queryMonitor = new QueryMonitor(
         new ScheduledThreadPoolExecutor(1),
@@ -116,10 +121,10 @@ public class QueryMonitorIntegrationTest {
       return null;
     };
 
-    doAnswer(processSetQueryCanceledException).when(query)
+    doAnswer(processSetQueryCanceledException).when(executionContext)
         .setQueryCanceledException(any(CacheRuntimeException.class));
 
-    startQueryThread(queryMonitor, query);
+    startQueryThread(queryMonitor, executionContext);
 
     GeodeAwaitility.await().until(() -> cacheRuntimeException != null);
 
@@ -129,11 +134,55 @@ public class QueryMonitorIntegrationTest {
     assertThat(queryExecutionCanceledException).isNotNull();
   }
 
+  @Test
+  public void 
monitorMultipleQueryExecutionsThenStopMonitoringNoRemainingCancellationTasksRunning()
 {
+    cache = (InternalCache) new CacheFactory().set(LOCATORS, 
"").set(MCAST_PORT, "0").create();
+    final DefaultQuery query =
+        (DefaultQuery) cache.getQueryService().newQuery("SELECT DISTINCT * 
FROM /exampleRegion");
+    executionContext = new QueryExecutionContext(null, cache, query);
+    final ExecutionContext executionContext2 = new QueryExecutionContext(null, 
cache, query);
+
+    final ScheduledThreadPoolExecutor queryMonitorExecutor = new 
ScheduledThreadPoolExecutor(1);
+
+    final QueryMonitor queryMonitor = new QueryMonitor(
+        queryMonitorExecutor,
+        cache,
+        NEVER_EXPIRE_MILLIS);
+
+    // We want to ensure isolation of cancellation tasks for different query 
threads/executions.
+    // Here we ensure that if we monitor/unmonitor two executions in different 
threads that
+    // both cancellation tasks are removed from the executor queue.
+    final Thread firstClientQueryThread = new Thread(() -> {
+      queryMonitor.monitorQueryExecution(executionContext);
+
+      final Thread secondClientQueryThread = new Thread(() -> {
+        queryMonitor.monitorQueryExecution(executionContext2);
+        queryMonitor.stopMonitoringQueryExecution(executionContext2);
+      });
+
+      secondClientQueryThread.start();
+      try {
+        secondClientQueryThread.join();
+      } catch (final InterruptedException ex) {
+        Assert.fail("Unexpected exception while executing query. Details:\n"
+            + ExceptionUtils.getStackTrace(ex));
+      }
+
+      queryMonitor.stopMonitoringQueryExecution(executionContext);
+    });
+
+    firstClientQueryThread.start();
+
+    // Both cancellation tasks should have been removed upon stopping 
monitoring the queries on
+    // each thread, so the task queue size should be 0
+    GeodeAwaitility.await().until(() -> queryMonitorExecutor.getQueue().size() 
== 0);
+  }
+
   private void startQueryThread(final QueryMonitor queryMonitor,
-      final DefaultQuery query) {
+      final ExecutionContext executionContext) {
 
     final Thread queryThread = new Thread(() -> {
-      queryMonitor.monitorQueryThread(query);
+      queryMonitor.monitorQueryExecution(executionContext);
 
       while (true) {
         try {
@@ -144,7 +193,7 @@ public class QueryMonitorIntegrationTest {
           break;
         } catch (final InterruptedException e) {
           Thread.currentThread().interrupt();
-          throw new AssertionError("Simulated query thread unexpectedly 
interrupted.");
+          throw new AssertionError("Simulated executionContext thread 
unexpectedly interrupted.");
         }
       }
     });
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexJUnitTest.java
index ad96bae..d4161f6 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexJUnitTest.java
@@ -41,6 +41,7 @@ import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.cache.query.internal.DefaultQuery.TestHook;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.internal.cache.persistence.query.CloseableIterator;
 import org.apache.geode.test.junit.categories.OQLIndexTest;
 
@@ -346,7 +347,8 @@ public class CompactRangeIndexJUnitTest {
     }
 
     @Override
-    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
+    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
+        final ExecutionContext executionContext) {
       try {
         switch (spot) {
           case ATTEMPT_REMOVE:
@@ -395,7 +397,8 @@ public class CompactRangeIndexJUnitTest {
     }
 
     @Override
-    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
+    public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
+        final ExecutionContext executionContext) {
       try {
         switch (spot) {
           case ATTEMPT_REMOVE:
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
index 1c9fcc2..98c3176 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorIntegrationTest.java
@@ -80,7 +80,8 @@ public class PartitionedRegionQueryEvaluatorIntegrationTest {
 
     // running the algorithm and getting the list of bucktes to grab
     PartitionedRegionQueryEvaluator evalr =
-        new PartitionedRegionQueryEvaluator(pr.getSystem(), pr, null, null, 
null, bucketsToQuery);
+        new PartitionedRegionQueryEvaluator(pr.getSystem(), pr, null, null, 
null, null,
+            bucketsToQuery);
     Map n2bMap = null;
     try {
       n2bMap = evalr.buildNodeToBucketMap();
@@ -99,7 +100,8 @@ public class PartitionedRegionQueryEvaluatorIntegrationTest {
       assertTrue(" Bucket with Id = " + i + " not present in bucketList.",
           buckList.contains(new Integer(i)));
     }
-    clearAllPartitionedRegion(pr);
+
+    pr.destroyRegion();
   }
 
   /**
@@ -136,12 +138,6 @@ public class 
PartitionedRegionQueryEvaluatorIntegrationTest {
     }
   }
 
-  private void clearAllPartitionedRegion(PartitionedRegion pr) {
-    InternalCache cache = pr.getCache();
-    Region allPR = PartitionedRegionHelper.getPRRoot(cache);
-    allPR.clear();
-  }
-
   /**
    * This function decides number of the nodes in the list of bucket2Node 
region
    */
diff --git 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index e035104..75abd4a 100644
--- 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -351,7 +351,7 @@ fromData,16
 toData,16
 
 
org/apache/geode/distributed/internal/streaming/StreamingOperation$StreamingReplyMessage,2
-fromData,420
+fromData,422
 toData,85
 
 org/apache/geode/distributed/internal/tcpserver/InfoRequest,2
diff --git 
a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
 
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
index 7d8c447..bcb70d1 100644
--- 
a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
+++ 
b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
@@ -54,7 +54,7 @@ public class MonitorQueryUnderContentionBenchmark {
   private static final int START_DELAY_RANGE_MILLIS = 100;
 
   /*
-   * Delay, from time startOneSimulatedQuery() is called, until 
monitorQueryThread() is called.
+   * Delay, from time startOneSimulatedQuery() is called, until 
monitorQueryExecution() is called.
    */
   private static final int QUERY_INITIAL_DELAY = 0;
 
@@ -136,8 +136,8 @@ public class MonitorQueryUnderContentionBenchmark {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   // @Warmup we don't warm up because our @Setup warms us up
   public void monitorQuery() {
-    queryMonitor.monitorQueryThread(query);
-    queryMonitor.stopMonitoringQueryThread(query);
+    queryMonitor.monitorQueryExecution(query);
+    queryMonitor.stopMonitoringQueryExecution(query);
   }
 
   private void generateLoad(final ScheduledExecutorService executorService,
@@ -160,9 +160,9 @@ public class MonitorQueryUnderContentionBenchmark {
       int startDelayRangeMillis, int completeDelayRangeMillis) {
     executorService.schedule(() -> {
       final DefaultQuery query = createDefaultQuery();
-      queryMonitor.monitorQueryThread(query);
+      queryMonitor.monitorQueryExecution(query);
       executorService.schedule(() -> {
-        queryMonitor.stopMonitoringQueryThread(query);
+        queryMonitor.stopMonitoringQueryExecution(query);
       },
           gaussianLong(completeDelayRangeMillis),
           TimeUnit.MILLISECONDS);
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index 5a38a81..92b3be2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -22,16 +22,12 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.annotations.internal.MutableForTesting;
-import org.apache.geode.cache.CacheRuntimeException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.internal.ProxyCache;
 import org.apache.geode.cache.client.internal.ServerProxy;
@@ -82,8 +78,6 @@ public class DefaultQuery implements Query {
 
   private final QueryStatistics stats;
 
-  private Optional<ScheduledFuture> cancelationTask;
-
   private boolean traceOn = false;
 
   @Immutable
@@ -110,8 +104,6 @@ public class DefaultQuery implements Query {
    */
   public static final Object NULL_RESULT = new Object();
 
-  private volatile CacheRuntimeException queryCancelledException;
-
   private ProxyCache proxyCache;
 
   private boolean isCqQuery = false;
@@ -155,9 +147,6 @@ public class DefaultQuery implements Query {
   private static final ThreadLocal<Map<String, Set<String>>> 
pdxClassToMethodsMap =
       ThreadLocal.withInitial(HashMap::new);
 
-  static final ThreadLocal<AtomicBoolean> queryCanceled =
-      ThreadLocal.withInitial(AtomicBoolean::new);
-
   public static void setPdxClasstoMethodsmap(Map<String, Set<String>> map) {
     pdxClassToMethodsMap.set(map);
   }
@@ -166,14 +155,6 @@ public class DefaultQuery implements Query {
     return pdxClassToMethodsMap.get();
   }
 
-  public Optional<ScheduledFuture> getCancelationTask() {
-    return cancelationTask;
-  }
-
-  public void setCancelationTask(final ScheduledFuture cancelationTask) {
-    this.cancelationTask = Optional.of(cancelationTask);
-  }
-
   /**
    * Should be constructed from DefaultQueryService
    *
@@ -195,7 +176,6 @@ public class DefaultQuery implements Query {
     this.traceOn = compiler.isTraceRequested() || QUERY_VERBOSE;
     this.cache = cache;
     this.stats = new DefaultQueryStatistics();
-    this.cancelationTask = Optional.empty();
   }
 
   /**
@@ -247,18 +227,19 @@ public class DefaultQuery implements Query {
 
     Object result = null;
     Boolean initialPdxReadSerialized = 
this.cache.getPdxReadSerializedOverride();
+    final ExecutionContext context = new QueryExecutionContext(params, 
this.cache, this);
+
     try {
       // Setting the readSerialized flag for local queries
       this.cache.setPdxReadSerializedOverride(true);
-      ExecutionContext context = new QueryExecutionContext(params, this.cache, 
this);
       indexObserver = this.startTrace();
       if (qe != null) {
         if (DefaultQuery.testHook != null) {
           
DefaultQuery.testHook.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_QUERY_EXECUTION,
-              this);
+              this, context);
         }
 
-        result = qe.executeQuery(this, params, null);
+        result = qe.executeQuery(this, context, params, null);
         // For local queries returning pdx objects wrap the resultset with
         // ResultsCollectionPdxDeserializerWrapper
         // which deserializes these pdx objects.
@@ -278,10 +259,9 @@ public class DefaultQuery implements Query {
         // Add current thread to be monitored by QueryMonitor.
         // In case of partitioned region it will be added before the query 
execution
         // starts on the Local Buckets.
-        queryMonitor.monitorQueryThread(this);
+        queryMonitor.monitorQueryExecution(context);
       }
 
-      context.setCqQueryContext(this.isCqQuery);
       result = executeUsingContext(context);
       // Only wrap/copy results when copy on read is set and an index is used
       // This is because when an index is used, the results are actual 
references to values in the
@@ -317,34 +297,17 @@ public class DefaultQuery implements Query {
       }
       return result;
     } catch (QueryExecutionCanceledException ignore) {
-      return reinterpretQueryExecutionCanceledException();
+      return context.reinterpretQueryExecutionCanceledException();
     } finally {
       this.cache.setPdxReadSerializedOverride(initialPdxReadSerialized);
       if (queryMonitor != null) {
-        queryMonitor.stopMonitoringQueryThread(this);
+        queryMonitor.stopMonitoringQueryExecution(context);
       }
       this.endTrace(indexObserver, startTime, result);
     }
   }
 
   /**
-   * This method attempts to reintrepret a {@link 
QueryExecutionCanceledException} using the
-   * the value returned by {@link #getQueryCanceledException} (set by the 
{@link QueryMonitor}).
-   *
-   * @throws if {@link #getQueryCanceledException} doesn't return {@code null} 
then throw that
-   *         {@link CacheRuntimeException}, otherwise throw {@link 
QueryExecutionCanceledException}
-   */
-  private Object reinterpretQueryExecutionCanceledException() {
-    final CacheRuntimeException queryCanceledException = 
getQueryCanceledException();
-    if (queryCanceledException != null) {
-      throw queryCanceledException;
-    } else {
-      throw new QueryExecutionCanceledException(
-          "Query was canceled. It may be due to low memory or the query was 
running longer than the MAX_QUERY_EXECUTION_TIME.");
-    }
-  }
-
-  /**
    * For Order by queries ,since they are already ordered by the comparator && 
it takes care of
    * conversion, we do not have to wrap it in a wrapper
    */
@@ -401,9 +364,8 @@ public class DefaultQuery implements Query {
       }
     }
 
-    ExecutionContext context = new QueryExecutionContext(parameters, 
this.cache, this);
+    final ExecutionContext context = new QueryExecutionContext(parameters, 
this.cache, this);
     context.setBucketRegion(pr, bukRgn);
-    context.setCqQueryContext(this.isCqQuery);
 
     // Check if QueryMonitor is enabled, if enabled add query to be monitored.
     QueryMonitor queryMonitor = this.cache.getQueryMonitor();
@@ -413,7 +375,7 @@ public class DefaultQuery implements Query {
     // QueryMonitor Service.
     if (queryMonitor != null && PRQueryProcessor.NUM_THREADS > 1) {
       // Add current thread to be monitored by QueryMonitor.
-      queryMonitor.monitorQueryThread(this);
+      queryMonitor.monitorQueryExecution(context);
     }
 
     Object result = null;
@@ -421,7 +383,7 @@ public class DefaultQuery implements Query {
       result = executeUsingContext(context);
     } finally {
       if (queryMonitor != null && PRQueryProcessor.NUM_THREADS > 1) {
-        queryMonitor.stopMonitoringQueryThread(this);
+        queryMonitor.stopMonitoringQueryExecution(context);
       }
 
       int resultSize = 0;
@@ -454,7 +416,8 @@ public class DefaultQuery implements Query {
       observer.beforeQueryEvaluation(this.compiledQuery, context);
 
       if (DefaultQuery.testHook != null) {
-        
DefaultQuery.testHook.doTestHook(TestHook.SPOTS.BEFORE_QUERY_DEPENDENCY_COMPUTATION,
 this);
+        
DefaultQuery.testHook.doTestHook(TestHook.SPOTS.BEFORE_QUERY_DEPENDENCY_COMPUTATION,
 this,
+            context);
       }
       Object results = null;
       try {
@@ -462,11 +425,11 @@ public class DefaultQuery implements Query {
         // first pre-compute dependencies, cached in the context.
         this.compiledQuery.computeDependencies(context);
         if (testHook != null) {
-          
testHook.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_QUERY_EXECUTION, this);
+          
testHook.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_QUERY_EXECUTION, this, 
context);
         }
         results = this.compiledQuery.evaluate(context);
       } catch (QueryExecutionCanceledException ignore) {
-        reinterpretQueryExecutionCanceledException();
+        context.reinterpretQueryExecutionCanceledException();
       } finally {
         observer.afterQueryEvaluation(results);
       }
@@ -477,7 +440,7 @@ public class DefaultQuery implements Query {
       updateStatistics(endTime - startTime);
       pdxClassToFieldsMap.remove();
       pdxClassToMethodsMap.remove();
-      queryCanceled.remove();
+      ExecutionContext.isCanceled.remove();
       ((TXManagerImpl) 
this.cache.getCacheTransactionManager()).unpauseTransaction(tx);
     }
   }
@@ -706,25 +669,6 @@ public class DefaultQuery implements Query {
     this.serverProxy = serverProxy;
   }
 
-  /**
-   * Check to see if the query execution got canceled. The query gets canceled 
by the QueryMonitor
-   * if it takes more than the max query execution time or low memory 
situations
-   */
-  public boolean isCanceled() {
-    return getQueryCanceledException() != null;
-  }
-
-  public CacheRuntimeException getQueryCanceledException() {
-    return queryCancelledException;
-  }
-
-  /**
-   * The query gets canceled by the QueryMonitor with the reason being 
specified
-   */
-  public void setQueryCanceledException(final CacheRuntimeException 
queryCanceledException) {
-    this.queryCancelledException = queryCanceledException;
-  }
-
   public void setIsCqQuery(boolean isCqQuery) {
     this.isCqQuery = isCqQuery;
   }
@@ -753,9 +697,6 @@ public class DefaultQuery implements Query {
   public String toString() {
     StringBuilder sb = new StringBuilder("Query String = ");
     sb.append(this.queryString);
-    sb.append(';');
-    sb.append("isCancelled = ");
-    sb.append(this.isCanceled());
     sb.append("; Total Executions = ");
     sb.append(this.numExecutions);
     sb.append("; Total Execution Time = ");
@@ -886,7 +827,8 @@ public class DefaultQuery implements Query {
         LocalDataSet localDataSet =
             (LocalDataSet) 
PartitionRegionHelper.getLocalDataForContext(context);
         Set<Integer> buckets = localDataSet.getBucketSet();
-        result = qe.executeQuery(this, params, buckets);
+        final ExecutionContext executionContext = new ExecutionContext(null, 
cache);
+        result = qe.executeQuery(this, executionContext, params, buckets);
         return result;
       } else {
         // Not supported on regions other than PartitionRegion.
@@ -1035,7 +977,8 @@ public class DefaultQuery implements Query {
      *        more than one physical location in the query processing code.
      * @param query nullable, DefaultQuery, for SPOTS in the DefaultQuery class
      */
-    void doTestHook(SPOTS spot, DefaultQuery query);
+    void doTestHook(SPOTS spot, DefaultQuery query,
+        ExecutionContext executionContext);
   }
 
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java
index 3385530..2999e96 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java
@@ -21,9 +21,13 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.Stack;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.geode.cache.CacheRuntimeException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.query.AmbiguousNameException;
 import org.apache.geode.cache.query.NameResolutionException;
@@ -92,6 +96,11 @@ public class ExecutionContext {
   private Object currentProjectionField = null;
   private boolean isPRQueryNode = false;
 
+  private Optional<ScheduledFuture> cancelationTask;
+  private volatile CacheRuntimeException canceledException;
+  static final ThreadLocal<AtomicBoolean> isCanceled =
+      ThreadLocal.withInitial(AtomicBoolean::new);
+
   /**
    * Param specialIteratorVar name of special variable to use to denote the 
current iteration
    * element. Used to implement the "this" var in the query shortcut methods
@@ -101,6 +110,15 @@ public class ExecutionContext {
   public ExecutionContext(Object[] bindArguments, InternalCache cache) {
     this.bindArguments = bindArguments;
     this.cache = cache;
+    this.cancelationTask = Optional.empty();
+  }
+
+  Optional<ScheduledFuture> getCancelationTask() {
+    return cancelationTask;
+  }
+
+  void setCancelationTask(final ScheduledFuture cancelationTask) {
+    this.cancelationTask = Optional.of(cancelationTask);
   }
 
   public CachePerfStats getCachePerfStats() {
@@ -603,10 +621,6 @@ public class ExecutionContext {
     throw new UnsupportedOperationException("Method should not have been 
called");
   }
 
-  public void setCqQueryContext(boolean cqQuery) {
-    throw new UnsupportedOperationException("Method should not have been 
called");
-  }
-
   public Query getQuery() {
     throw new UnsupportedOperationException("Method should not have been 
called");
   }
@@ -647,4 +661,36 @@ public class ExecutionContext {
     return this.isPRQueryNode;
   }
 
+  /**
+   * Check to see if the query execution was canceled. The query gets canceled 
by the QueryMonitor
+   * if it takes more than the max query execution time or low memory 
situations
+   */
+  public boolean isCanceled() {
+    return getQueryCanceledException() != null;
+  }
+
+  public CacheRuntimeException getQueryCanceledException() {
+    return canceledException;
+  }
+
+  public void setQueryCanceledException(final CacheRuntimeException 
queryCanceledException) {
+    this.canceledException = queryCanceledException;
+  }
+
+  /**
+   * This method attempts to reintrepret a {@link 
QueryExecutionCanceledException} using the
+   * the value returned by {@link #getQueryCanceledException} (set by the 
{@link QueryMonitor}).
+   *
+   * @throws if {@link #getQueryCanceledException} doesn't return {@code null} 
then throw that
+   *         {@link CacheRuntimeException}, otherwise throw {@link 
QueryExecutionCanceledException}
+   */
+  Object reinterpretQueryExecutionCanceledException() {
+    final CacheRuntimeException queryCanceledException = 
getQueryCanceledException();
+    if (queryCanceledException != null) {
+      throw queryCanceledException;
+    } else {
+      throw new QueryExecutionCanceledException(
+          "Query was canceled. It may be due to low memory or the query was 
running longer than the MAX_QUERY_EXECUTION_TIME.");
+    }
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java
index 696e501..7371fde 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java
@@ -72,6 +72,7 @@ public class QueryExecutionContext extends ExecutionContext {
   public QueryExecutionContext(Object[] bindArguments, InternalCache cache, 
Query query) {
     super(bindArguments, cache);
     this.query = query;
+    this.cqQueryContext = ((DefaultQuery) query).isCqQuery();
   }
 
   @Override
@@ -142,11 +143,6 @@ public class QueryExecutionContext extends 
ExecutionContext {
   }
 
   @Override
-  public void setCqQueryContext(boolean cqQuery) {
-    this.cqQueryContext = cqQuery;
-  }
-
-  @Override
   public boolean isCqQueryContext() {
     return this.cqQueryContext;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutor.java
index 8042e8e..b5cf9a7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutor.java
@@ -27,8 +27,9 @@ import org.apache.geode.cache.query.TypeMismatchException;
  * @since GemFire 5.5
  */
 public interface QueryExecutor {
-  // TODO Yogesh , fix this signature
-  Object executeQuery(DefaultQuery query, Object[] parameters, Set buckets)
+  Object executeQuery(DefaultQuery query,
+      ExecutionContext executionContext,
+      Object[] parameters, Set buckets)
       throws FunctionDomainException, TypeMismatchException, 
NameResolutionException,
       QueryInvocationTargetException;
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
index b04cab5..26d421e 100755
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
@@ -43,24 +43,25 @@ import org.apache.geode.internal.logging.LogService;
  * with {@code isLowMemory=true}. In that mode, any attempt to monitor a (new) 
query will
  * throw an exception.
  *
- * The {@link #monitorQueryThread(DefaultQuery)} method initiates monitoring 
of a query. {@link
- * #stopMonitoringQueryThread(DefaultQuery)} stops monitoring a query.
+ * The {@link #monitorQueryExecution(ExecutionContext)} method initiates 
monitoring of a query.
+ * {@link
+ * #stopMonitoringQueryExecution(ExecutionContext)} stops monitoring a query.
  *
  * If the {@link QueryMonitor} determines a query needs to be canceled: either 
because it is taking
  * too long, or because memory is running low, it does two things:
  *
  * <ul>
  * <li>registers an exception on the query via
- * {@link DefaultQuery#setQueryCanceledException(CacheRuntimeException)}</li>
- * <li>sets the {@link DefaultQuery#queryCanceled} thread-local variable to 
{@code true}
+ * {@link 
ExecutionContext#setQueryCanceledException(CacheRuntimeException)}</li>
+ * <li>sets the {@link ExecutionContext#queryCanceled} thread-local variable 
to {@code true}
  * so that subsequent calls to {@link 
#throwExceptionIfQueryOnCurrentThreadIsCanceled()} will throw
  * an exception</li>
  * </ul>
  *
  * Code outside this class, that wishes to participate in cooperative 
cancelation of queries calls
  * {@link #throwExceptionIfQueryOnCurrentThreadIsCanceled()} at various yield 
points. In catch
- * blocks, {@link DefaultQuery#getQueryCanceledException()} is interrogated to 
learn the cancelation
- * cause.
+ * blocks, {@link ExecutionContext#getQueryCanceledException()} is 
interrogated to learn the
+ * cancelation cause.
  *
  * @since GemFire 6.0
  */
@@ -89,7 +90,7 @@ public class QueryMonitor {
    * timeout deadline was reached, resulting in queue growth.
    *
    * Setting the remove-on-cancel-policy to {@code true} changes that behavior 
so tasks are removed
-   * immediately upon cancelation (via {@link 
#stopMonitoringQueryThread(DefaultQuery)}).
+   * immediately upon cancelation (via {@link 
#stopMonitoringQueryExecution(ExecutionContext)}).
    *
    * @param executor is responsible for processing scheduled cancelation tasks
    * @param cache is interrogated via {@link 
InternalCache#isQueryMonitorDisabledForLowMemory} at
@@ -116,8 +117,8 @@ public class QueryMonitor {
    * Must not be called from a thread that is not the query thread, because 
this class uses a
    * ThreadLocal on the query thread!
    */
-  public void monitorQueryThread(final DefaultQuery query) {
-    monitorQueryThread(query, defaultMaxQueryExecutionTime);
+  public void monitorQueryExecution(final ExecutionContext executionContext) {
+    monitorQueryExecution(executionContext, defaultMaxQueryExecutionTime);
   }
 
   /**
@@ -127,18 +128,19 @@ public class QueryMonitor {
    * Must not be called from a thread that is not the query thread, because 
this class uses a
    * ThreadLocal on the query thread!
    */
-  private void monitorQueryThread(final DefaultQuery query,
+  private void monitorQueryExecution(final ExecutionContext executionContext,
       final long maxQueryExecutionTime) {
 
     // cq query is not monitored
-    if (query.isCqQuery()) {
+    if (executionContext.isCqQueryContext()) {
       return;
     }
 
-    query.setCancelationTask(scheduleCancelationTask(query, 
maxQueryExecutionTime));
+    executionContext
+        .setCancelationTask(scheduleCancelationTask(executionContext, 
maxQueryExecutionTime));
 
     if (logger.isDebugEnabled()) {
-      logDebug(query, "Adding thread to QueryMonitor.");
+      logDebug(executionContext, "Adding thread to QueryMonitor.");
     }
   }
 
@@ -148,11 +150,11 @@ public class QueryMonitor {
    * Must not be called from a thread that is not the query thread, because 
this class uses a
    * ThreadLocal on the query thread!
    */
-  public void stopMonitoringQueryThread(final DefaultQuery query) {
-    query.getCancelationTask().ifPresent(task -> task.cancel(false));
+  public void stopMonitoringQueryExecution(final ExecutionContext 
executionContext) {
+    executionContext.getCancelationTask().ifPresent(task -> 
task.cancel(false));
 
     if (logger.isDebugEnabled()) {
-      logDebug(query, "Query completed before cancelation.");
+      logDebug(executionContext, "Query completed before cancelation.");
     }
   }
 
@@ -164,7 +166,7 @@ public class QueryMonitor {
    * @throws QueryExecutionCanceledException if the query has been canceled
    */
   public static void throwExceptionIfQueryOnCurrentThreadIsCanceled() {
-    if (DefaultQuery.queryCanceled.get().get()) {
+    if (ExecutionContext.isCanceled.get().get()) {
       throw new QueryExecutionCanceledException();
     }
   }
@@ -215,12 +217,12 @@ public class QueryMonitor {
         long delay,
         TimeUnit unit,
         ScheduledExecutorService scheduledExecutorService,
-        DefaultQuery query);
+        ExecutionContext executionContext);
 
     boolean isLowMemory();
 
     CacheRuntimeException createCancelationException(long timeLimitMillis,
-        DefaultQuery query);
+        ExecutionContext executionContext);
   }
 
   /**
@@ -257,18 +259,18 @@ public class QueryMonitor {
       public ScheduledFuture<?> schedule(final Runnable command, final long 
delay,
           final TimeUnit unit,
           final ScheduledExecutorService scheduledExecutorService,
-          final DefaultQuery query) {
+          final ExecutionContext executionContext) {
         return scheduledExecutorService.schedule(command, delay, unit);
       }
 
       @Override
       public CacheRuntimeException createCancelationException(final long 
timeLimitMillis,
-          final DefaultQuery query) {
+          final ExecutionContext executionContext) {
         final String message = String.format(
             "Query execution canceled after exceeding max execution time 
%sms.",
             timeLimitMillis);
         if (logger.isInfoEnabled()) {
-          logger.info(String.format("%s %s", message, query));
+          logger.info(String.format("%s %s", message, executionContext));
         }
         return new QueryExecutionTimeoutException(message);
       }
@@ -307,16 +309,16 @@ public class QueryMonitor {
       public ScheduledFuture<?> schedule(final Runnable command, final long 
timeLimitMillis,
           final TimeUnit unit,
           final ScheduledExecutorService scheduledExecutorService,
-          final DefaultQuery query) {
+          final ExecutionContext executionContext) {
         final CacheRuntimeException lowMemoryException =
-            createCancelationException(timeLimitMillis, query);
-        query.setQueryCanceledException(lowMemoryException);
+            createCancelationException(timeLimitMillis, executionContext);
+        executionContext.setQueryCanceledException(lowMemoryException);
         throw lowMemoryException;
       }
 
       @Override
       public CacheRuntimeException createCancelationException(final long 
timeLimitMillis,
-          final DefaultQuery query) {
+          final ExecutionContext executionContext) {
         return new QueryExecutionLowMemoryException(
             String.format(
                 "Query execution canceled due to memory threshold crossed in 
system, memory used: %s bytes.",
@@ -348,12 +350,12 @@ public class QueryMonitor {
 
   }
 
-  private ScheduledFuture<?> scheduleCancelationTask(final DefaultQuery query,
+  private ScheduledFuture<?> scheduleCancelationTask(final ExecutionContext 
executionContext,
       final long timeLimitMillis) {
 
-    // Make ThreadLocal queryCanceled available to closure, which will run in 
a separate thread
+    // Make ThreadLocal isCanceled available to closure, which will run in a 
separate thread
     final AtomicBoolean queryCanceledThreadLocal =
-        DefaultQuery.queryCanceled.get();
+        ExecutionContext.isCanceled.get();
 
     /*
      * This is where the GoF "State" design pattern comes home to roost.
@@ -369,19 +371,20 @@ public class QueryMonitor {
      */
     return memoryState.schedule(() -> {
       final CacheRuntimeException exception = memoryState
-          .createCancelationException(timeLimitMillis, query);
+          .createCancelationException(timeLimitMillis, executionContext);
 
-      query.setQueryCanceledException(exception);
+      executionContext.setQueryCanceledException(exception);
       queryCanceledThreadLocal.set(true);
 
-    }, timeLimitMillis, TimeUnit.MILLISECONDS, executor, query);
+    }, timeLimitMillis, TimeUnit.MILLISECONDS, executor, executionContext);
   }
 
-  private void logDebug(final DefaultQuery query, final String message) {
+  private void logDebug(final ExecutionContext executionContext, final String 
message) {
     final Thread queryThread = Thread.currentThread();
     logger.debug(
         message + " QueryMonitor size is: {}, Thread (id): {}, Query: {}, 
Thread is : {}",
-        executor.getQueue().size(), queryThread.getId(), 
query.getQueryString(),
+        executor.getQueue().size(), queryThread.getId(),
+        executionContext.getQuery().getQueryString(),
         queryThread);
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
index c13f82e..f8cc2e6 100755
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
@@ -192,7 +192,7 @@ public class HashIndex extends AbstractIndex {
       if (DefaultQuery.testHook != null) {
         DefaultQuery.testHook.doTestHook(
             
DefaultQuery.TestHook.SPOTS.BEFORE_ADD_OR_UPDATE_MAPPING_OR_DESERIALIZING_NTH_STREAMINGOPERATION,
-            null);
+            null, null);
       }
       Object newKey = TypeUtils.indexKeyFor(key);
       if (newKey.equals(QueryService.UNDEFINED)) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
index 622a17a..7acaf79 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
@@ -103,7 +103,7 @@ public class MemoryIndexStore implements IndexStore {
       if (DefaultQuery.testHook != null) {
         DefaultQuery.testHook.doTestHook(
             
DefaultQuery.TestHook.SPOTS.BEFORE_ADD_OR_UPDATE_MAPPING_OR_DESERIALIZING_NTH_STREAMINGOPERATION,
-            null);
+            null, null);
       }
 
       // Check if reverse-map is present.
@@ -152,7 +152,8 @@ public class MemoryIndexStore implements IndexStore {
           IndexElemArray elemArray = new IndexElemArray();
           if (DefaultQuery.testHook != null) {
             DefaultQuery.testHook.doTestHook(
-                
DefaultQuery.TestHook.SPOTS.BEGIN_TRANSITION_FROM_REGION_ENTRY_TO_ELEMARRAY, 
null);
+                
DefaultQuery.TestHook.SPOTS.BEGIN_TRANSITION_FROM_REGION_ENTRY_TO_ELEMARRAY, 
null,
+                null);
           }
           elemArray.add(regionEntries);
           elemArray.add(re);
@@ -162,12 +163,12 @@ public class MemoryIndexStore implements IndexStore {
           if (DefaultQuery.testHook != null) {
             DefaultQuery.testHook
                 
.doTestHook(DefaultQuery.TestHook.SPOTS.TRANSITIONED_FROM_REGION_ENTRY_TO_ELEMARRAY,
-                    null);
+                    null, null);
           }
           if (DefaultQuery.testHook != null) {
             DefaultQuery.testHook.doTestHook(
                 
DefaultQuery.TestHook.SPOTS.COMPLETE_TRANSITION_FROM_REGION_ENTRY_TO_ELEMARRAY,
-                null);
+                null, null);
           }
         } else if (regionEntries instanceof IndexConcurrentHashSet) {
           // This synchronized is for avoiding conflcts with remove of
@@ -191,7 +192,7 @@ public class MemoryIndexStore implements IndexStore {
               if (DefaultQuery.testHook != null) {
                 DefaultQuery.testHook.doTestHook(
                     
DefaultQuery.TestHook.SPOTS.BEGIN_TRANSITION_FROM_ELEMARRAY_TO_CONCURRENT_HASH_SET,
-                    null);
+                    null, null);
               }
               // on a remove from the elem array, another thread could start 
and complete its remove
               // at this point, that is why we need to replace before adding 
the elem array elements
@@ -205,7 +206,7 @@ public class MemoryIndexStore implements IndexStore {
                 if (DefaultQuery.testHook != null) {
                   DefaultQuery.testHook
                       
.doTestHook(DefaultQuery.TestHook.SPOTS.TRANSITIONED_FROM_ELEMARRAY_TO_TOKEN,
-                          null);
+                          null, null);
                 }
                 set.add(re);
                 set.addAll(elemArray);
@@ -221,7 +222,7 @@ public class MemoryIndexStore implements IndexStore {
                 if (DefaultQuery.testHook != null) {
                   DefaultQuery.testHook.doTestHook(
                       
DefaultQuery.TestHook.SPOTS.COMPLETE_TRANSITION_FROM_ELEMARRAY_TO_CONCURRENT_HASH_SET,
-                      null);
+                      null, null);
                 }
               }
             } else {
@@ -304,7 +305,7 @@ public class MemoryIndexStore implements IndexStore {
     try {
       Object newKey = convertToIndexKey(key, entry);
       if (DefaultQuery.testHook != null) {
-        
DefaultQuery.testHook.doTestHook(DefaultQuery.TestHook.SPOTS.ATTEMPT_REMOVE, 
null);
+        
DefaultQuery.testHook.doTestHook(DefaultQuery.TestHook.SPOTS.ATTEMPT_REMOVE, 
null, null);
       }
       boolean retry = false;
       do {
@@ -312,7 +313,7 @@ public class MemoryIndexStore implements IndexStore {
         Object regionEntries = this.valueToEntriesMap.get(newKey);
         if (regionEntries == TRANSITIONING_TOKEN) {
           if (DefaultQuery.testHook != null) {
-            
DefaultQuery.testHook.doTestHook(DefaultQuery.TestHook.SPOTS.ATTEMPT_RETRY, 
null);
+            
DefaultQuery.testHook.doTestHook(DefaultQuery.TestHook.SPOTS.ATTEMPT_RETRY, 
null, null);
           }
           retry = true;
           continue;
@@ -332,12 +333,13 @@ public class MemoryIndexStore implements IndexStore {
             Collection entries = (Collection) regionEntries;
             if (DefaultQuery.testHook != null) {
               DefaultQuery.testHook
-                  
.doTestHook(DefaultQuery.TestHook.SPOTS.BEGIN_REMOVE_FROM_ELEM_ARRAY, null);
+                  
.doTestHook(DefaultQuery.TestHook.SPOTS.BEGIN_REMOVE_FROM_ELEM_ARRAY, null, 
null);
             }
             found = entries.remove(entry);
             if (DefaultQuery.testHook != null) {
               DefaultQuery.testHook
-                  
.doTestHook(DefaultQuery.TestHook.SPOTS.REMOVE_CALLED_FROM_ELEM_ARRAY, null);
+                  
.doTestHook(DefaultQuery.TestHook.SPOTS.REMOVE_CALLED_FROM_ELEM_ARRAY, null,
+                      null);
             }
             // This could be IndexElementArray and might be changing to Set
             // If the remove occurred before changing to a set, then next time 
it will not be
@@ -365,7 +367,8 @@ public class MemoryIndexStore implements IndexStore {
             }
             if (DefaultQuery.testHook != null) {
               DefaultQuery.testHook
-                  
.doTestHook(DefaultQuery.TestHook.SPOTS.COMPLETE_REMOVE_FROM_ELEM_ARRAY, null);
+                  
.doTestHook(DefaultQuery.TestHook.SPOTS.COMPLETE_REMOVE_FROM_ELEM_ARRAY, null,
+                      null);
             }
           }
         }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
index 4c96375..de0a549 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/streaming/StreamingOperation.java
@@ -547,7 +547,7 @@ public abstract class StreamingOperation {
             if (DefaultQuery.testHook != null) {
               DefaultQuery.testHook.doTestHook(
                   
DefaultQuery.TestHook.SPOTS.BEFORE_ADD_OR_UPDATE_MAPPING_OR_DESERIALIZING_NTH_STREAMINGOPERATION,
-                  null);
+                  null, null);
             }
             if (isQueryMessageProcessor && QueryMonitor.isLowMemory()) {
               lowMemoryDetected = true;
@@ -571,7 +571,7 @@ public abstract class StreamingOperation {
             if (DefaultQuery.testHook != null) {
               DefaultQuery.testHook.doTestHook(
                   
DefaultQuery.TestHook.SPOTS.LOW_MEMORY_WHEN_DESERIALIZING_STREAMINGOPERATION,
-                  null);
+                  null, null);
             }
           }
         } finally {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java
index 6594fd2..ba3f5c9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalDataSet.java
@@ -52,6 +52,8 @@ import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.TypeMismatchException;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.QueryExecutionContext;
 import org.apache.geode.cache.query.internal.QueryExecutor;
 import org.apache.geode.cache.query.internal.QueryObserver;
 import org.apache.geode.cache.snapshot.RegionSnapshotService;
@@ -141,8 +143,9 @@ public class LocalDataSet implements Region, QueryExecutor {
     QueryService qs = getCache().getLocalQueryService();
     DefaultQuery query = (DefaultQuery) qs
         .newQuery("select * from " + getFullPath() + " this where " + 
queryPredicate);
+    final ExecutionContext executionContext = new QueryExecutionContext(null, 
getCache(), query);
     Object[] params = null;
-    return (SelectResults) this.executeQuery(query, params, getBucketSet());
+    return (SelectResults) this.executeQuery(query, executionContext, params, 
getBucketSet());
   }
 
   @Override
@@ -170,7 +173,9 @@ public class LocalDataSet implements Region, QueryExecutor {
    * MULTI REGION PR BASED QUERIES.
    */
   @Override
-  public Object executeQuery(DefaultQuery query, Object[] parameters, Set 
buckets)
+  public Object executeQuery(DefaultQuery query,
+      final ExecutionContext executionContext,
+      Object[] parameters, Set buckets)
       throws FunctionDomainException, TypeMismatchException, 
NameResolutionException,
       QueryInvocationTargetException {
     long startTime = 0L;
@@ -183,7 +188,7 @@ public class LocalDataSet implements Region, QueryExecutor {
     QueryObserver indexObserver = query.startTrace();
 
     try {
-      result = this.proxy.executeQuery(query, parameters, buckets);
+      result = this.proxy.executeQuery(query, executionContext, parameters, 
buckets);
     } finally {
       query.endTrace(indexObserver, startTime, result);
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java
index 4467ee6..4182a38 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PRQueryProcessor.java
@@ -233,7 +233,6 @@ public class PRQueryProcessor {
       throws ForceReattemptException, QueryInvocationTargetException, 
QueryException {
     // Check if QueryMonitor is enabled, if so add query to be monitored.
     QueryMonitor queryMonitor = null;
-    context.setCqQueryContext(query.isCqQuery());
     if (GemFireCacheImpl.getInstance() != null) {
       queryMonitor = GemFireCacheImpl.getInstance().getQueryMonitor();
     }
@@ -241,7 +240,7 @@ public class PRQueryProcessor {
     try {
       if (queryMonitor != null) {
         // Add current thread to be monitored by QueryMonitor.
-        queryMonitor.monitorQueryThread(query);
+        queryMonitor.monitorQueryExecution(context);
       }
 
       Object results = query.executeUsingContext(context);
@@ -270,7 +269,7 @@ public class PRQueryProcessor {
       throw qe;
     } finally {
       if (queryMonitor != null) {
-        queryMonitor.stopMonitoringQueryThread(query);
+        queryMonitor.stopMonitoringQueryExecution(context);
       }
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index a94a09c..77beb80 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -1878,12 +1878,15 @@ public class PartitionedRegion extends LocalRegion
    * @since GemFire 5.1
    */
   @Override
-  public Object executeQuery(DefaultQuery query, Object[] parameters, Set 
buckets)
+  public Object executeQuery(final DefaultQuery query,
+      final ExecutionContext executionContext,
+      final Object[] parameters,
+      final Set buckets)
       throws FunctionDomainException, TypeMismatchException, 
NameResolutionException,
       QueryInvocationTargetException {
     for (;;) {
       try {
-        return doExecuteQuery(query, parameters, buckets);
+        return doExecuteQuery(query, executionContext, parameters, buckets);
       } catch (ForceReattemptException ignore) {
         // fall through and loop
       }
@@ -1895,7 +1898,10 @@ public class PartitionedRegion extends LocalRegion
    *
    * @throws ForceReattemptException if one of the buckets moved out from 
under us
    */
-  private Object doExecuteQuery(DefaultQuery query, Object[] parameters, Set 
buckets)
+  private Object doExecuteQuery(final DefaultQuery query,
+      final ExecutionContext executionContext,
+      final Object[] parameters,
+      final Set buckets)
       throws FunctionDomainException, TypeMismatchException, 
NameResolutionException,
       QueryInvocationTargetException, ForceReattemptException {
     if (logger.isDebugEnabled()) {
@@ -1948,7 +1954,7 @@ public class PartitionedRegion extends LocalRegion
     SelectResults results = selectExpr.getEmptyResultSet(parameters, 
getCache(), query);
 
     PartitionedRegionQueryEvaluator prqe = new 
PartitionedRegionQueryEvaluator(this.getSystem(),
-        this, query, parameters, results, allBuckets);
+        this, query, executionContext, parameters, results, allBuckets);
     for (;;) {
       this.getCancelCriterion().checkCancelInProgress(null);
       boolean interrupted = Thread.interrupted();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index f584b99..45092ed 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -119,6 +119,7 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
   private final PartitionedRegion pr;
   private volatile Map<InternalDistributedMember, List<Integer>> 
node2bucketIds;
   private final DefaultQuery query;
+  private final ExecutionContext executionContext;
   private final Object[] parameters;
   private SelectResults cumulativeResults;
   /**
@@ -141,12 +142,17 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
    * @param parameters the parameters for executing the query
    * @param cumulativeResults where to add the results as they come in
    */
-  public PartitionedRegionQueryEvaluator(InternalDistributedSystem sys, 
PartitionedRegion pr,
-      DefaultQuery query, Object[] parameters, SelectResults cumulativeResults,
-      Set<Integer> bucketsToQuery) {
+  public PartitionedRegionQueryEvaluator(final InternalDistributedSystem sys,
+      final PartitionedRegion pr,
+      final DefaultQuery query,
+      final ExecutionContext executionContext,
+      final Object[] parameters,
+      final SelectResults cumulativeResults,
+      final Set<Integer> bucketsToQuery) {
     super(sys, pr.getPRId());
     this.pr = pr;
     this.query = query;
+    this.executionContext = executionContext;
     this.parameters = parameters;
     this.cumulativeResults = cumulativeResults;
     this.bucketsToQuery = bucketsToQuery;
@@ -209,7 +215,7 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
       if (traceObject instanceof PRQueryTraceInfo) {
         if (DefaultQuery.testHook != null) {
           DefaultQuery.testHook
-              
.doTestHook(DefaultQuery.TestHook.SPOTS.PULL_OFF_PR_QUERY_TRACE_INFO, null);
+              
.doTestHook(DefaultQuery.TestHook.SPOTS.PULL_OFF_PR_QUERY_TRACE_INFO, null, 
null);
         }
         PRQueryTraceInfo queryTrace = (PRQueryTraceInfo) objects.remove(0);
         queryTrace.setSender(sender);
@@ -228,7 +234,7 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
     }
 
     synchronized (results) {
-      if (!QueryMonitor.isLowMemory() && !this.query.isCanceled()) {
+      if (!QueryMonitor.isLowMemory() && !this.executionContext.isCanceled()) {
         results.add(objects);
       } else {
         if (logger.isDebugEnabled()) {
@@ -237,11 +243,11 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
         if (QueryMonitor.isLowMemory()) {
           String reason =
               "Query execution canceled due to low memory while gathering 
results from partitioned regions";
-          query.setQueryCanceledException(new 
QueryExecutionLowMemoryException(reason));
+          executionContext.setQueryCanceledException(new 
QueryExecutionLowMemoryException(reason));
         } else {
           if (logger.isDebugEnabled()) {
             logger.debug("query cancelled while gathering results, aborting 
due to exception "
-                + query.getQueryCanceledException());
+                + executionContext.getQueryCanceledException());
           }
         }
         return false;
@@ -421,8 +427,8 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
       }
     }
 
-    if (query.isCanceled()) {
-      throw query.getQueryCanceledException();
+    if (executionContext.isCanceled()) {
+      throw executionContext.getQueryCanceledException();
     }
 
     if (localFault != null) {
@@ -641,7 +647,7 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
 
     if (DefaultQuery.testHook != null) {
       DefaultQuery.testHook
-          
.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_BUILD_CUMULATIVE_RESULT, null);
+          
.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_BUILD_CUMULATIVE_RESULT, null, 
null);
     }
 
     boolean localResults = false;
@@ -737,7 +743,7 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
     if (prQueryTraceInfoList != null && this.query.isTraced() && 
logger.isInfoEnabled()) {
       if (DefaultQuery.testHook != null) {
         DefaultQuery.testHook
-            
.doTestHook(DefaultQuery.TestHook.SPOTS.CREATE_PR_QUERY_TRACE_STRING, null);
+            
.doTestHook(DefaultQuery.TestHook.SPOTS.CREATE_PR_QUERY_TRACE_STRING, null, 
null);
       }
       StringBuilder sb = new StringBuilder();
       sb.append(String.format("Trace Info for Query: %s",
@@ -759,19 +765,18 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
     if (QueryMonitor.isLowMemory()) {
       String reason =
           "Query execution canceled due to low memory while gathering results 
from partitioned regions";
-      query.setQueryCanceledException(new 
QueryExecutionLowMemoryException(reason));
+      executionContext.setQueryCanceledException(new 
QueryExecutionLowMemoryException(reason));
       if (DefaultQuery.testHook != null) {
         DefaultQuery.testHook
-            
.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_THROW_QUERY_CANCELED_EXCEPTION, 
null);
+            
.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_THROW_QUERY_CANCELED_EXCEPTION, 
null,
+                null);
       }
-      throw query.getQueryCanceledException();
-    } else if (query.isCanceled()) {
-      throw query.getQueryCanceledException();
+      throw executionContext.getQueryCanceledException();
+    } else if (executionContext.isCanceled()) {
+      throw executionContext.getQueryCanceledException();
     }
   }
 
-
-
   /**
    * Adds all counts from all member buckets to cumulative results.
    *
@@ -989,7 +994,7 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
           if (DefaultQuery.testHook != null) {
             DefaultQuery.testHook
                 
.doTestHook(DefaultQuery.TestHook.SPOTS.CREATE_PR_QUERY_TRACE_INFO_FROM_LOCAL_NODE,
-                    null);
+                    null, null);
           }
           PRQueryTraceInfo queryTraceInfo = new PRQueryTraceInfo();
           
queryTraceInfo.setNumResults(queryTraceInfo.calculateNumberOfResults(resultCollector));
@@ -1102,7 +1107,7 @@ public class PartitionedRegionQueryEvaluator extends 
StreamingPartitionOperation
         if (m.isCanceled()) {
           String reason =
               "Query execution canceled due to low memory while gathering 
results from partitioned regions";
-          query.setQueryCanceledException(new 
QueryExecutionLowMemoryException(reason));
+          executionContext.setQueryCanceledException(new 
QueryExecutionLowMemoryException(reason));
           this.abort = true;
         }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index 21f1c9a..06251e4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -32,8 +32,10 @@ import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
 import org.apache.geode.cache.query.Struct;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.IndexTrackingQueryObserver;
 import org.apache.geode.cache.query.internal.PRQueryTraceInfo;
+import org.apache.geode.cache.query.internal.QueryExecutionContext;
 import org.apache.geode.cache.query.internal.QueryMonitor;
 import org.apache.geode.cache.query.internal.QueryObserver;
 import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
@@ -169,6 +171,7 @@ public class QueryMessage extends 
StreamingPartitionOperation.StreamingPartition
     }
 
     DefaultQuery query = new DefaultQuery(this.queryString, pr.getCache(), 
false);
+    final ExecutionContext executionContext = new QueryExecutionContext(null, 
pr.getCache(), query);
     // Remote query, use the PDX types in serialized form.
     Boolean initialPdxReadSerialized = 
pr.getCache().getPdxReadSerializedOverride();
     pr.getCache().setPdxReadSerializedOverride(true);
@@ -194,7 +197,7 @@ public class QueryMessage extends 
StreamingPartitionOperation.StreamingPartition
         if (DefaultQuery.testHook != null) {
           DefaultQuery.testHook
               
.doTestHook(DefaultQuery.TestHook.SPOTS.CREATE_PR_QUERY_TRACE_INFO_FOR_REMOTE_QUERY,
-                  null);
+                  null, null);
         }
         queryTraceInfo = new PRQueryTraceInfo();
         queryTraceList = Collections.singletonList(queryTraceInfo);
@@ -214,7 +217,8 @@ public class QueryMessage extends 
StreamingPartitionOperation.StreamingPartition
       if (isQueryTraced) {
         if (DefaultQuery.testHook != null) {
           DefaultQuery.testHook
-              
.doTestHook(DefaultQuery.TestHook.SPOTS.POPULATING_TRACE_INFO_FOR_REMOTE_QUERY, 
null);
+              
.doTestHook(DefaultQuery.TestHook.SPOTS.POPULATING_TRACE_INFO_FOR_REMOTE_QUERY, 
null,
+                  null);
         }
 
         // calculate the number of rows being sent
@@ -248,8 +252,8 @@ public class QueryMessage extends 
StreamingPartitionOperation.StreamingPartition
             "Query execution canceled due to memory threshold crossed in 
system, memory used: %s bytes.",
             QueryMonitor.getMemoryUsedBytes());
         throw new QueryExecutionLowMemoryException(reason);
-      } else if (query.isCanceled()) {
-        throw query.getQueryCanceledException();
+      } else if (executionContext.isCanceled()) {
+        throw executionContext.getQueryCanceledException();
       }
       super.operateOnPartitionedRegion(dm, pr, startTime);
     } finally {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
index d250141..79c2c5d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommandQuery.java
@@ -24,11 +24,14 @@ import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.operations.QueryOperationContext;
 import org.apache.geode.cache.query.Query;
 import org.apache.geode.cache.query.QueryException;
+import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
+import org.apache.geode.cache.query.QueryExecutionTimeoutException;
 import org.apache.geode.cache.query.QueryInvalidException;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.Struct;
 import org.apache.geode.cache.query.internal.CqEntry;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.QueryExecutionCanceledException;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
 import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
 import org.apache.geode.cache.query.internal.types.StructTypeImpl;
@@ -51,12 +54,19 @@ public abstract class BaseCommandQuery extends BaseCommand {
    *
    * @return true if successful execution false in case of failure.
    */
-  protected boolean processQuery(Message msg, Query query, String queryString, 
Set regionNames,
-      long start, ServerCQ cqQuery, QueryOperationContext queryContext, 
ServerConnection servConn,
-      boolean sendResults, final SecurityService securityService)
+  protected boolean processQuery(final Message msg,
+      final Query query,
+      final String queryString,
+      final Set regionNames,
+      final long start,
+      final ServerCQ cqQuery,
+      final QueryOperationContext queryContext,
+      final ServerConnection servConn,
+      final boolean sendResults,
+      final SecurityService securityService)
       throws IOException, InterruptedException {
-    return processQueryUsingParams(msg, query, queryString, regionNames, 
start, cqQuery,
-        queryContext, servConn, sendResults, null, securityService);
+    return processQueryUsingParams(msg, query, queryString, regionNames, start,
+        cqQuery, queryContext, servConn, sendResults, null, securityService);
   }
 
   /**
@@ -64,9 +74,16 @@ public abstract class BaseCommandQuery extends BaseCommand {
    *
    * @return true if successful execution false in case of failure.
    */
-  protected boolean processQueryUsingParams(Message msg, Query query, String 
queryString,
-      Set regionNames, long start, ServerCQ cqQuery, QueryOperationContext 
queryContext,
-      ServerConnection servConn, boolean sendResults, Object[] params,
+  protected boolean processQueryUsingParams(final Message msg,
+      final Query query,
+      final String queryString,
+      final Set regionNames,
+      long start,
+      final ServerCQ cqQuery,
+      QueryOperationContext queryContext,
+      final ServerConnection servConn,
+      final boolean sendResults,
+      final Object[] params,
       final SecurityService securityService) throws IOException, 
InterruptedException {
     ChunkedMessage queryResponseMsg = servConn.getQueryResponseMessage();
     CacheServerStats stats = servConn.getCacheServerStats();
@@ -262,9 +279,10 @@ public abstract class BaseCommandQuery extends BaseCommand 
{
       checkForInterrupt(servConn, e);
       // Otherwise, write a query response and continue
       // Check if query got canceled from QueryMonitor.
-      DefaultQuery defaultQuery = (DefaultQuery) query;
-      if ((defaultQuery).isCanceled()) {
-        e = new 
QueryException(defaultQuery.getQueryCanceledException().getMessage(),
+      if (e instanceof QueryExecutionLowMemoryException
+          || e instanceof QueryExecutionTimeoutException
+          || e instanceof QueryExecutionCanceledException) {
+        e = new QueryException(e.getMessage(),
             e.getCause());
       }
       writeQueryResponseException(msg, e, servConn);
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java
index 9b1fb91..d750eb3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java
@@ -34,6 +34,8 @@ import org.apache.geode.cache.query.Query;
 import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.QueryExecutionContext;
 import org.apache.geode.internal.InternalEntity;
 import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.InternalCache;
@@ -153,7 +155,8 @@ public class QueryDataFunction implements Function, 
InternalEntity {
             }
             LocalDataSet lds = new LocalDataSet(parRegion, 
localPrimaryBucketSet);
             DefaultQuery query = (DefaultQuery) 
cache.getQueryService().newQuery(queryString);
-            results = lds.executeQuery(query, null, localPrimaryBucketSet);
+            final ExecutionContext executionContext = new 
QueryExecutionContext(null, cache, query);
+            results = lds.executeQuery(query, executionContext, null, 
localPrimaryBucketSet);
           }
         } else {
           rcollector = FunctionService.onRegion(cache.getRegion(regionName))
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/CompiledIteratorDefJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/CompiledIteratorDefJUnitTest.java
index 3336df3..0859762 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/CompiledIteratorDefJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/CompiledIteratorDefJUnitTest.java
@@ -32,7 +32,7 @@ public class CompiledIteratorDefJUnitTest {
     CompiledValue compiledValue = mock(CompiledValue.class);
     CompiledIteratorDef compiledIteratorDef =
         new CompiledIteratorDef("TestIterator", TypeUtils.OBJECT_TYPE, 
compiledValue);
-    ExecutionContext executionContext = mock(ExecutionContext.class);
+    final ExecutionContext executionContext = mock(ExecutionContext.class);
     RuntimeIterator runtimeIterator = mock(RuntimeIterator.class);
 
     when(runtimeIterator.evaluateCollection(executionContext))
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
index bf5ea42..a4f8af7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
@@ -61,46 +61,46 @@ public class QueryMonitorTest {
   @After
   public void afterClass() {
     /*
-     * If the query cancelation task is run, it will set the queryCanceled
+     * If the query cancelation task is run, it will set the isCanceled
      * thread local on the query to true.
      *
      * We need to clean up that state between tests because they can run on 
this same thread.
      * In production, this cleanup is done in DefaultQuery after the query 
executes.
      */
-    DefaultQuery.queryCanceled.get().set(false);
+    ExecutionContext.isCanceled.get().set(false);
     monitor.setLowMemory(false, 100);
   }
 
   @Test
   public void monitorQueryThreadCqQueryIsNotMonitored() {
-    DefaultQuery query = mock(DefaultQuery.class);
-    when(query.isCqQuery()).thenReturn(true);
-    monitor.monitorQueryThread(query);
+    final ExecutionContext executionContext = mock(ExecutionContext.class);
+    when(executionContext.isCqQueryContext()).thenReturn(true);
+    monitor.monitorQueryExecution(executionContext);
 
-    // Verify that the expiration task was not scheduled for the CQ query
+    // Verify that the expiration task was not scheduled for the CQ 
executionContext
     Mockito.verify(scheduledThreadPoolExecutor, 
never()).schedule(captor.capture(), anyLong(),
         isA(TimeUnit.class));
   }
 
   @Test
   public void monitorQueryThreadLowMemoryExceptionThrown() {
-    DefaultQuery query = mock(DefaultQuery.class);
+    final ExecutionContext executionContext = mock(ExecutionContext.class);
     monitor.setLowMemory(true, 100);
 
-    assertThatThrownBy(() -> monitor.monitorQueryThread(query))
+    assertThatThrownBy(() -> monitor.monitorQueryExecution(executionContext))
         .isExactlyInstanceOf(QueryExecutionLowMemoryException.class);
   }
 
   @Test
   public void monitorQueryThreadExpirationTaskScheduled() {
-    DefaultQuery query = mock(DefaultQuery.class);
+    final ExecutionContext executionContext = mock(ExecutionContext.class);
 
-    monitor.monitorQueryThread(query);
+    monitor.monitorQueryExecution(executionContext);
     Mockito.verify(scheduledThreadPoolExecutor, 
times(1)).schedule(captor.capture(), anyLong(),
         isA(TimeUnit.class));
     captor.getValue().run();
 
-    Mockito.verify(query, times(1))
+    Mockito.verify(executionContext, times(1))
         .setQueryCanceledException(isA(QueryExecutionTimeoutException.class));
     
assertThatThrownBy(QueryMonitor::throwExceptionIfQueryOnCurrentThreadIsCanceled)
         .isExactlyInstanceOf(QueryExecutionCanceledException.class);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
index c05e279..adf36d8 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
@@ -41,6 +41,7 @@ import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.internal.CompiledSelect;
 import org.apache.geode.cache.query.internal.CompiledValue;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.LinkedResultSet;
 import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
 import org.apache.geode.distributed.internal.DistributionMessage;
@@ -90,6 +91,7 @@ public class PartitionedRegionQueryEvaluatorTest {
     when(pr.getDataStore()).thenReturn(dataStore);
     when(pr.getCache()).thenReturn(cache);
 
+
   }
 
   @Test
@@ -114,7 +116,8 @@ public class PartitionedRegionQueryEvaluatorTest {
     dataStore.setScenarios(scenarios);
 
     PartitionedRegionQueryEvaluator prqe = new 
ExtendedPartitionedRegionQueryEvaluator(system, pr,
-        query, null, new LinkedResultSet(), allBucketsToQuery, scenarios);
+        query, mock(ExecutionContext.class), null, new LinkedResultSet(), 
allBucketsToQuery,
+        scenarios);
     Collection results = prqe.queryBuckets(null).asList();
     assertNotNull(results);
     assertEquals(resultsForMember1.size(), results.size());
@@ -145,7 +148,8 @@ public class PartitionedRegionQueryEvaluatorTest {
     dataStore.setScenarios(scenarios);
 
     PartitionedRegionQueryEvaluator prqe = new 
ExtendedPartitionedRegionQueryEvaluator(system, pr,
-        query, null, new LinkedResultSet(), allBucketsToQuery, scenarios);
+        query, mock(ExecutionContext.class), null, new LinkedResultSet(), 
allBucketsToQuery,
+        scenarios);
     Collection results = prqe.queryBuckets(null).asList();
     List expectedResults = new LinkedList();
     expectedResults.addAll(resultsForMember1);
@@ -198,7 +202,8 @@ public class PartitionedRegionQueryEvaluatorTest {
     dataStore.setScenarios(scenarios);
 
     PartitionedRegionQueryEvaluator prqe = new 
ExtendedPartitionedRegionQueryEvaluator(system, pr,
-        query, null, new LinkedResultSet(), allBucketsToQuery, scenarios);
+        query, mock(ExecutionContext.class), null, new LinkedResultSet(), 
allBucketsToQuery,
+        scenarios);
     Collection results = prqe.queryBuckets(null).asList();
 
     List expectedResults = new LinkedList();
@@ -215,8 +220,9 @@ public class PartitionedRegionQueryEvaluatorTest {
   public void testGetAllNodesShouldBeRandomized() {
     List bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
     Set bucketSet = new HashSet(bucketList);
-    PartitionedRegionQueryEvaluator prqe = new 
PartitionedRegionQueryEvaluator(system, pr, query,
-        null, new LinkedResultSet(), bucketSet);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, 
mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
     RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
     when(regionAdvisor.adviseDataStore()).thenReturn(bucketSet);
     await()
@@ -287,11 +293,15 @@ public class PartitionedRegionQueryEvaluatorTest {
     // pass through so we can fake out the executeQuery locally
     PRQueryProcessor extendedPRQueryProcessor;
 
-    public ExtendedPartitionedRegionQueryEvaluator(InternalDistributedSystem 
sys,
-        PartitionedRegion pr, DefaultQuery query, Object[] parameters,
-        SelectResults cumulativeResults, Set<Integer> bucketsToQuery,
-        Queue<PartitionedQueryScenario> scenarios) {
-      super(sys, pr, query, parameters, cumulativeResults, bucketsToQuery);
+    public ExtendedPartitionedRegionQueryEvaluator(final 
InternalDistributedSystem sys,
+        final PartitionedRegion pr,
+        final DefaultQuery query,
+        final ExecutionContext executionContext,
+        final Object[] parameters,
+        final SelectResults cumulativeResults,
+        final Set<Integer> bucketsToQuery,
+        final Queue<PartitionedQueryScenario> scenarios) {
+      super(sys, pr, query, executionContext, parameters, cumulativeResults, 
bucketsToQuery);
       this.scenarios = scenarios;
       extendedPRQueryProcessor =
           new ExtendedPRQueryProcessor(pr, query, null, new 
LinkedList(bucketsToQuery));
diff --git 
a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
 
b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
index e966aed..cd88004 100644
--- 
a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
+++ 
b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
@@ -15,6 +15,8 @@
 package org.apache.geode.cache.query.dunit;
 
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.fail;
 
 import java.io.File;
@@ -31,6 +33,7 @@ import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.query.CqAttributes;
@@ -40,9 +43,11 @@ import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.Query;
 import org.apache.geode.cache.query.QueryExecutionTimeoutException;
 import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.cq.dunit.CqQueryTestListener;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.dunit.AsyncInvocation;
@@ -117,7 +122,7 @@ public class QueryMonitorDUnitTest {
     server1.invoke(() -> populateRegion(0, 100));
 
     // execute the query
-    VMProvider.invokeInEveryMember(() -> executeQuery(), client3, client4);
+    VMProvider.invokeInEveryMember(() -> executeQueries(), client3, client4);
   }
 
   @Test
@@ -135,7 +140,7 @@ public class QueryMonitorDUnitTest {
     server1.invoke(() -> populateRegion(0, 100));
 
     // execute the query from client3
-    client3.invoke(() -> executeQuery());
+    client3.invoke(() -> executeQueries());
   }
 
   @Test
@@ -157,8 +162,8 @@ public class QueryMonitorDUnitTest {
     server1.invoke(() -> populateRegion(0, 100));
     server2.invoke(() -> populateRegion(100, 200));
 
-    client3.invoke(() -> executeQuery());
-    client4.invoke(() -> executeQuery());
+    client3.invoke(() -> executeQueries());
+    client4.invoke(() -> executeQueries());
   }
 
   @Test
@@ -170,7 +175,7 @@ public class QueryMonitorDUnitTest {
     server1.invoke(() -> populateRegion(0, 100));
 
     // execute the query from one server
-    server1.invoke(() -> executeQuery());
+    server1.invoke(() -> executeQueries());
 
     // Create index and Perform cache op. Bug#44307
     server1.invoke(() -> {
@@ -194,8 +199,8 @@ public class QueryMonitorDUnitTest {
     server2.invoke(() -> populateRegion(200, 300));
 
     // execute the query from one server
-    server1.invoke(() -> executeQuery());
-    server2.invoke(() -> executeQuery());
+    server1.invoke(() -> executeQueries());
+    server2.invoke(() -> executeQueries());
   }
 
   @Test
@@ -217,8 +222,8 @@ public class QueryMonitorDUnitTest {
     client4 = cluster.startClientVM(4, new Properties(), ccf -> {
       configureClientCacheFactory(ccf, server2Port);
     });
-    client3.invoke(() -> executeQuery());
-    client4.invoke(() -> executeQuery());
+    client3.invoke(() -> executeQueries());
+    client4.invoke(() -> executeQueries());
   }
 
   @Test
@@ -250,8 +255,8 @@ public class QueryMonitorDUnitTest {
         cluster.startClientVM(4,
             c -> 
c.withPoolSubscription(true).withServerConnection(server2Port));
 
-    client3.invoke(() -> executeQuery());
-    client4.invoke(() -> executeQuery());
+    client3.invoke(() -> executeQueries());
+    client4.invoke(() -> executeQueries());
   }
 
   @Test
@@ -358,6 +363,48 @@ public class QueryMonitorDUnitTest {
     });
   }
 
+  @Test
+  public void testQueryObjectReusableAfterFirstExecutionTimesOut() throws 
Exception {
+    server1.invoke(() -> {
+      // Setting query timeout to 10 seconds
+      GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = 10000;
+
+      final InternalCache cache = ClusterStartupRule.getCache();
+      final String regionName = "exampleRegion";
+      final RegionFactory<Integer, Integer> regionFactory =
+          cache.createRegionFactory(RegionShortcut.LOCAL);
+      final Region<Integer, Integer> exampleRegion = 
regionFactory.create(regionName);
+      final int numRegionEntries = 10;
+      for (int i = 0; i < numRegionEntries; ++i) {
+        exampleRegion.put(i, i);
+      }
+
+      final String queryString = "select * from /" + regionName;
+      final Query query = cache.getQueryService().newQuery(queryString);
+
+      // Install a test hook which causes the query to timeout
+      DefaultQuery.testHook =
+          (spot, qry, executionContext) -> {
+            if (spot != 
DefaultQuery.TestHook.SPOTS.BEFORE_QUERY_DEPENDENCY_COMPUTATION) {
+              return;
+            }
+
+            await("stall the query execution so that it gets cancelled")
+                .until(executionContext::isCanceled);
+          };
+
+      
assertThatThrownBy(query::execute).isInstanceOf(QueryExecutionTimeoutException.class);
+
+      // Uninstall test hook so that query object is reused to execute again, 
this time successfully
+      DefaultQuery.testHook = null;
+
+      final SelectResults results = (SelectResults) query.execute();
+
+      for (int i = 0; i < numRegionEntries; ++i) {
+        assertThat(results.contains(i)).isTrue();
+      }
+    });
+  }
 
   private static void populateRegion(int startingId, int endingId) {
     Region exampleRegion = 
ClusterStartupRule.getCache().getRegion("exampleRegion");
@@ -366,7 +413,7 @@ public class QueryMonitorDUnitTest {
     }
   }
 
-  private static void executeQuery() {
+  private static void executeQueries() {
     QueryService queryService;
     if (ClusterStartupRule.getClientCache() == null) {
       queryService = ClusterStartupRule.getCache().getQueryService();
@@ -383,6 +430,20 @@ public class QueryMonitorDUnitTest {
         verifyException(e);
       }
     }
+
+    final String queryString = "SELECT DISTINCT * FROM /exampleRegion p WHERE 
p.id = $1";
+    final Query query = queryService.newQuery(queryString);
+
+    try {
+      for (int i = 0; i < 100; ++i) {
+        final Object[] params = new Object[1];
+        params[0] = i;
+
+        query.execute(params);
+      }
+    } catch (final Exception e) {
+      verifyException(e);
+    }
   }
 
   private static void configureClientCacheFactory(ClientCacheFactory ccf, 
int... serverPorts) {
@@ -436,7 +497,8 @@ public class QueryMonitorDUnitTest {
    * query execution long enough for the QueryMonitor to mark the query as 
cancelled.
    */
   private static void delayQueryTestHook(final DefaultQuery.TestHook.SPOTS 
spot,
-      final DefaultQuery query) {
+      final DefaultQuery query,
+      final ExecutionContext executionContext) {
     if (spot != 
DefaultQuery.TestHook.SPOTS.BEFORE_QUERY_DEPENDENCY_COMPUTATION) {
       return;
     }
@@ -451,7 +513,7 @@ public class QueryMonitorDUnitTest {
      */
     await("stall the query execution so that it gets cancelled")
         .pollDelay(10, TimeUnit.MILLISECONDS)
-        .until(() -> query.isCanceled());
+        .until(() -> executionContext.isCanceled());
   }
 
   private static String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE  
p.ID > 100",
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ.java
index 069986f..fba72f1 100644
--- 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ.java
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ.java
@@ -132,8 +132,9 @@ public class ExecuteCQ extends BaseCQCommand {
         cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null);
       }
       ((DefaultQuery) query).setIsCqQuery(true);
-      successQuery = processQuery(clientMessage, query, cqQueryString, 
cqRegionNames, start,
-          cqQuery, executeCQContext, serverConnection, sendResults, 
securityService);
+      successQuery =
+          processQuery(clientMessage, query, cqQueryString, cqRegionNames,
+              start, cqQuery, executeCQContext, serverConnection, sendResults, 
securityService);
 
       // Update the CQ statistics.
       
cqQuery.getVsdStats().setCqInitialResultsTime(DistributionStats.getStatTime() - 
start);
diff --git 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
index f5825fc..8ae6d88 100755
--- 
a/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
+++ 
b/geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/command/ExecuteCQ61.java
@@ -179,8 +179,9 @@ public class ExecuteCQ61 extends BaseCQCommand {
           cqRegionNames = ((DefaultQuery) query).getRegionsInQuery(null);
         }
         ((DefaultQuery) query).setIsCqQuery(true);
-        successQuery = processQuery(clientMessage, query, cqQueryString, 
cqRegionNames, start,
-            cqQuery, executeCQContext, serverConnection, sendResults, 
securityService);
+        successQuery =
+            processQuery(clientMessage, query, cqQueryString, cqRegionNames,
+                start, cqQuery, executeCQContext, serverConnection, 
sendResults, securityService);
 
 
         // Update the CQ statistics.
diff --git 
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
 
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
index ddd9cdc..e0e296f 100644
--- 
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
+++ 
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/LocatorStarterRule.java
@@ -50,7 +50,7 @@ import org.apache.geode.internal.cache.InternalCache;
  *
  * <p>
  * If you need a rule to start a server/locator in different VMs for 
Distributed tests, You should
- * use {@code LocatorServerStartupRule}.
+ * use {@code ClusterStartupRule}.
  */
 public class LocatorStarterRule extends MemberStarterRule<LocatorStarterRule> 
implements Locator {
   private transient InternalLocator locator;
diff --git 
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
 
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
index ae7c12b..0c8b89d 100644
--- 
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
+++ 
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
@@ -50,7 +50,7 @@ import org.apache.geode.pdx.PdxSerializer;
  *
  * <p>
  * If you need a rule to start a server/locator in different VMs for 
Distributed tests, You should
- * use {@code LocatorServerStartupRule}.
+ * use {@code ClusterStartupRule}.
  */
 public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> 
implements Server {
   private transient InternalCache cache;

Reply via email to