http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java index 544fc13..48658fe 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java @@ -12,9 +12,19 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.cache.query.internal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.CacheRuntimeException; @@ -42,7 +52,7 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.CachePerfStats; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalDataSet; import org.apache.geode.internal.cache.PRQueryProcessor; import org.apache.geode.internal.cache.PartitionedRegion; @@ -50,36 +60,24 @@ import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.TXStateProxy; import org.apache.geode.internal.i18n.LocalizedStrings; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - - /** * Thread-safe implementation of org.apache.persistence.query.Query - * */ - public class DefaultQuery implements Query { + private final CompiledValue compiledQuery; + private final String queryString; - private final Cache cache; - // private Pool pool; + + private final InternalCache cache; + private ServerProxy serverProxy; protected AtomicLong numExecutions = new AtomicLong(0); - protected AtomicLong totalExecutionTime = new AtomicLong(0); - private QueryStatistics stats; - // TODO : Toggle the flag appropriately when implementing the compile() functionality - private boolean isCompiled = false; + private final AtomicLong totalExecutionTime = new AtomicLong(0); + + private final QueryStatistics stats; private boolean traceOn = false; @@ -93,21 +91,25 @@ public class DefaultQuery implements Query { * used for more than the set value. By default its set to 10 minutes, the time is set in * MilliSecs. */ - public static final int COMPILED_QUERY_CLEAR_TIME = - Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Query.COMPILED_QUERY_CLEAR_TIME", - 10 * 60 * 1000).intValue(); + public static final int COMPILED_QUERY_CLEAR_TIME = Integer.getInteger( + DistributionConfig.GEMFIRE_PREFIX + "Query.COMPILED_QUERY_CLEAR_TIME", 10 * 60 * 1000); public static int TEST_COMPILED_QUERY_CLEAR_TIME = -1; - // Use to represent null result. - // Used while adding PR results to the results-queue, which is a blocking queue. + /** + * Use to represent null result. Used while adding PR results to the results-queue, which is a + * blocking queue. + */ public static final Object NULL_RESULT = new Object(); private volatile boolean isCanceled = false; + private CacheRuntimeException canceledException; - // This is declared as array so that it can be synchronized between - // two threads to validate the state. + /** + * This is declared as array so that it can be synchronized between two threads to validate the + * state. + */ private final boolean[] queryCompletedForMonitoring = new boolean[] {false}; private ProxyCache proxyCache; @@ -116,28 +118,26 @@ public class DefaultQuery implements Query { private boolean isQueryWithFunctionContext = false; - // Holds the CQ reference. In cases of peer PRs this will be set to null - // even though isCqQuery is set to true. + /** + * Holds the CQ reference. In cases of peer PRs this will be set to null even though isCqQuery is + * set to true. + */ private InternalCqQuery cqQuery = null; private volatile boolean lastUsed = true; public static TestHook testHook; - private static final ThreadLocal<Boolean> pdxReadSerialized = new ThreadLocal() { - @Override - protected Boolean initialValue() { - return new Boolean(Boolean.FALSE); - } - }; + private static final ThreadLocal<Boolean> pdxReadSerialized = + ThreadLocal.withInitial(() -> Boolean.FALSE); - // indicates query executed remotely + /** indicates query executed remotely */ private boolean isRemoteQuery = false; // to prevent objects from getting deserialized private boolean keepSerialized = false; - public static final Set<String> reservedKeywords = new HashSet<String>(); + public static final Set<String> reservedKeywords = new HashSet<>(); static { reservedKeywords.add("hint"); @@ -230,14 +230,10 @@ public class DefaultQuery implements Query { new ThreadLocal() { @Override protected Map<String, Set<String>> initialValue() { - return new HashMap<String, Set<String>>(); + return new HashMap<>(); } }; - public static void setPdxClasstofieldsmap(Map<String, Set<String>> map) { - pdxClassToFieldsMap.set(map); - } - public static Map<String, Set<String>> getPdxClasstofieldsmap() { return pdxClassToFieldsMap.get(); } @@ -269,11 +265,11 @@ public class DefaultQuery implements Query { * * @see QueryService#newQuery */ - public DefaultQuery(String queryString, Cache cache, boolean isForRemote) { + public DefaultQuery(String queryString, InternalCache cache, boolean isForRemote) { this.queryString = queryString; QCompiler compiler = new QCompiler(); this.compiledQuery = compiler.compileQuery(queryString); - CompiledSelect cs = this.getSimpleSelect(); + CompiledSelect cs = getSimpleSelect(); if (cs != null && !isForRemote && (cs.isGroupBy() || cs.isOrderBy())) { QueryExecutionContext ctx = new QueryExecutionContext(null, cache); try { @@ -282,7 +278,7 @@ public class DefaultQuery implements Query { throw new QueryInvalidException("", qe); } } - this.traceOn = (compiler.isTraceRequested() || QUERY_VERBOSE); + this.traceOn = compiler.isTraceRequested() || QUERY_VERBOSE; this.cache = cache; this.stats = new DefaultQueryStatistics(); } @@ -295,7 +291,7 @@ public class DefaultQuery implements Query { pdxReadSerialized.set(readSerialized); } - /* + /** * helper method for setPdxReadSerialized */ public static void setPdxReadSerialized(Cache cache, boolean readSerialized) { @@ -304,20 +300,20 @@ public class DefaultQuery implements Query { } } - /** * Get statistics information for this query. */ + @Override public QueryStatistics getStatistics() { - return stats; + return this.stats; } - + @Override public String getQueryString() { return this.queryString; } - + @Override public Object execute() throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { return execute(EMPTY_ARRAY); @@ -326,43 +322,43 @@ public class DefaultQuery implements Query { /** * namespace or parameters can be null */ - public Object execute(Object[] parameters) throws FunctionDomainException, TypeMismatchException, + @Override + public Object execute(Object[] params) throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { // Local Query. - if (parameters == null) { + if (params == null) { throw new IllegalArgumentException( LocalizedStrings.DefaultQuery_PARAMETERS_CANNOT_BE_NULL.toLocalizedString()); } - // If pool is associated with the Query; execute the query on pool. - // ServerSide query. + // If pool is associated with the Query; execute the query on pool. ServerSide query. if (this.serverProxy != null) { // Execute Query using pool. - return executeOnServer(parameters); + return executeOnServer(params); } long startTime = 0L; - Object result = null; if (this.traceOn && this.cache != null) { startTime = NanoTimer.getTime(); } QueryObserver indexObserver = null; QueryMonitor queryMonitor = null; - QueryExecutor qe = checkQueryOnPR(parameters); + QueryExecutor qe = checkQueryOnPR(params); + Object result = null; try { - // Setting the readserialized flag for local queries - setPdxReadSerialized(cache, true); - ExecutionContext context = new QueryExecutionContext(parameters, this.cache, this); + // Setting the readSerialized flag for local queries + setPdxReadSerialized(this.cache, true); + ExecutionContext context = new QueryExecutionContext(params, this.cache, this); indexObserver = this.startTrace(); if (qe != null) { if (DefaultQuery.testHook != null) { DefaultQuery.testHook.doTestHook(1); } - result = qe.executeQuery(this, parameters, null); + result = qe.executeQuery(this, params, null); // For local queries returning pdx objects wrap the resultset with // ResultsCollectionPdxDeserializerWrapper // which deserializes these pdx objects. @@ -375,10 +371,8 @@ public class DefaultQuery implements Query { return result; } - // Get QueryMonitor. - if (GemFireCacheImpl.getInstance() != null) { - queryMonitor = GemFireCacheImpl.getInstance().getQueryMonitor(); - } + queryMonitor = this.cache.getQueryMonitor(); + // If QueryMonitor is enabled add query to be monitored. if (queryMonitor != null) { // Add current thread to be monitored by QueryMonitor. @@ -422,55 +416,44 @@ public class DefaultQuery implements Query { } } return result; - } catch (QueryExecutionCanceledException e) { + } catch (QueryExecutionCanceledException ignore) { // query execution canceled exception will be thrown from the QueryMonitor // canceled exception should not be null at this point as it should be set // when query is canceled. - if (canceledException != null) { - throw canceledException; + if (this.canceledException != null) { + throw this.canceledException; } else { throw new QueryExecutionCanceledException( "Query was canceled. It may be due to low memory or the query was running longer than the MAX_QUERY_EXECUTION_TIME."); } } finally { - setPdxReadSerialized(cache, false); + setPdxReadSerialized(this.cache, false); if (queryMonitor != null) { queryMonitor.stopMonitoringQueryThread(Thread.currentThread(), this); } this.endTrace(indexObserver, startTime, result); } - } - // For Order by queries ,since they are already ordered by the comparator - // && it takes care of conversion, we do not have to wrap it in a wrapper - public boolean needsPDXDeserializationWrapper(boolean isQueryOnPR) { - if (!isRemoteQuery() && !this.cache.getPdxReadSerialized()) { - return true; - /* - * if(isQueryOnPR) { // if the query is on PR we need a top level pdx deserialization wrapper - * only in case of //order by query or non distinct query CompiledSelect cs = - * this.getSimpleSelect(); if(cs != null) { return cs.getOrderByAttrs() != null ; }else { - * return true; } }else { return true; } - */ - } else { - return false; - } + /** + * For Order by queries ,since they are already ordered by the comparator && it takes care of + * conversion, we do not have to wrap it in a wrapper + */ + private boolean needsPDXDeserializationWrapper(boolean isQueryOnPR) { + return !isRemoteQuery() && !this.cache.getPdxReadSerialized(); } private Object executeOnServer(Object[] parameters) { long startTime = CachePerfStats.getStatTime(); Object result = null; try { - if (proxyCache != null) { + if (this.proxyCache != null) { if (this.proxyCache.isClosed()) { throw new CacheClosedException("Cache is closed for this user."); } UserAttributes.userAttributes.set(this.proxyCache.getUserAttributes()); } result = this.serverProxy.query(this.queryString, parameters); - // } catch (QueryExecutionCanceledException e) { - // throw canceledException; } finally { UserAttributes.userAttributes.set(null); long endTime = CachePerfStats.getStatTime(); @@ -491,7 +474,6 @@ public class DefaultQuery implements Query { } long startTime = 0L; - Object result = null; if (this.traceOn && this.cache != null) { startTime = NanoTimer.getTime(); } @@ -514,12 +496,9 @@ public class DefaultQuery implements Query { context.setBucketRegion(pr, bukRgn); context.setCqQueryContext(this.isCqQuery); - // Check if QueryMonitor is eabled, if enabled add query to be monitored. - QueryMonitor queryMonitor = null; + // Check if QueryMonitor is enabled, if enabled add query to be monitored. + QueryMonitor queryMonitor = this.cache.getQueryMonitor(); - if (GemFireCacheImpl.getInstance() != null) { - queryMonitor = GemFireCacheImpl.getInstance().getQueryMonitor(); - } // PRQueryProcessor executes the query using single thread(in-line) or ThreadPool. // In case of threadPool each individual threads needs to be added into // QueryMonitor Service. @@ -528,6 +507,7 @@ public class DefaultQuery implements Query { queryMonitor.monitorQueryThread(Thread.currentThread(), this); } + Object result = null; try { result = executeUsingContext(context); } finally { @@ -545,8 +525,7 @@ public class DefaultQuery implements Query { String queryVerboseMsg = DefaultQuery.getLogMessage(indexObserver, startTime, otherObserver, resultSize, this.queryString, bukRgn); - if (this.traceOn && this.cache != null) { - + if (this.traceOn) { if (this.cache.getLogger().fineEnabled()) { this.cache.getLogger().fine(queryVerboseMsg); } @@ -555,20 +534,20 @@ public class DefaultQuery implements Query { return result; } - public Object executeUsingContext(ExecutionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { QueryObserver observer = QueryObserverHolder.getInstance(); + long startTime = CachePerfStats.getStatTime(); TXStateProxy tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).internalSuspend(); try { observer.startQuery(this); - observer.beforeQueryEvaluation(compiledQuery, context); - Object results = null; + observer.beforeQueryEvaluation(this.compiledQuery, context); if (DefaultQuery.testHook != null) { DefaultQuery.testHook.doTestHook(6); } + Object results = null; try { // two-pass evaluation. // first pre-compute dependencies, cached in the context. @@ -577,12 +556,12 @@ public class DefaultQuery implements Query { testHook.doTestHook(1); } results = this.compiledQuery.evaluate(context); - } catch (QueryExecutionCanceledException e) { + } catch (QueryExecutionCanceledException ignore) { // query execution canceled exception will be thrown from the QueryMonitor // canceled exception should not be null at this point as it should be set // when query is canceled. - if (canceledException != null) { - throw canceledException; + if (this.canceledException != null) { + throw this.canceledException; } else { throw new QueryExecutionCanceledException( "Query was canceled. It may be due to low memory or the query was running longer than the MAX_QUERY_EXECUTION_TIME."); @@ -591,8 +570,6 @@ public class DefaultQuery implements Query { observer.afterQueryEvaluation(results); } return results; - // } catch (QueryExecutionCanceledException e) { - // throw canceledException; } finally { observer.endQuery(); long endTime = CachePerfStats.getStatTime(); @@ -603,22 +580,18 @@ public class DefaultQuery implements Query { } } - private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException, PartitionOfflineException { - // check for PartititionedRegions. If a PartitionedRegion is referred to in the query, + // check for PartitionedRegions. If a PartitionedRegion is referred to in the query, // then the following restrictions apply: // 1) the query must be just a SELECT expression; (preceded by zero or more IMPORT statements) // 2) the first FROM clause iterator cannot contain a subquery; // 3) PR reference can only be in the first FROM clause - // QueryExecutor foundPR = null; - // Region otherRgn = null; - - List<QueryExecutor> prs = new ArrayList<QueryExecutor>(); - for (Iterator itr = getRegionsInQuery(parameters).iterator(); itr.hasNext();) { - String regionPath = (String) itr.next(); + List<QueryExecutor> prs = new ArrayList<>(); + for (final Object o : getRegionsInQuery(parameters)) { + String regionPath = (String) o; Region rgn = this.cache.getRegion(regionPath); if (rgn == null) { this.cache.getCancelCriterion().checkCancelInProgress(null); @@ -632,7 +605,9 @@ public class DefaultQuery implements Query { } if (prs.size() == 1) { return prs.get(0); - } else if (prs.size() > 1) { // colocation checks; valid for more the one PRs + } else if (prs.size() > 1) { + // colocation checks; valid for more the one PRs + // First query has to be executed in a Function. if (!this.isQueryWithFunctionContext()) { throw new UnsupportedOperationException( @@ -650,8 +625,8 @@ public class DefaultQuery implements Query { continue; } other = allPRs; - if ((((PartitionedRegion) eachPR).getColocatedByList().contains(allPRs) - || ((PartitionedRegion) allPRs).getColocatedByList().contains(eachPR))) { + if (((PartitionedRegion) eachPR).getColocatedByList().contains(allPRs) + || ((PartitionedRegion) allPRs).getColocatedByList().contains(eachPR)) { colocated = true; break; } @@ -672,6 +647,7 @@ public class DefaultQuery implements Query { LocalizedStrings.DefaultQuery_QUERY_MUST_BE_A_SIMPLE_SELECT_WHEN_REFERENCING_A_PARTITIONED_REGION .toLocalizedString()); } + // make sure the where clause references no regions Set regions = new HashSet(); CompiledValue whereClause = select.getWhereClause(); @@ -688,9 +664,11 @@ public class DefaultQuery implements Query { // the first iterator in the FROM clause must be just a reference to the Partitioned Region Iterator fromClauseIterator = fromClause.iterator(); CompiledIteratorDef itrDef = (CompiledIteratorDef) fromClauseIterator.next(); + // By process of elimination, we know that the first iterator contains a reference // to the PR. Check to make sure there are no subqueries in this first iterator itrDef.visitNodes(new CompiledValue.NodeVisitor() { + @Override public boolean visit(CompiledValue node) { if (node instanceof CompiledSelect) { throw new UnsupportedOperationException( @@ -716,8 +694,8 @@ public class DefaultQuery implements Query { // check the projections, must not reference any regions List projs = select.getProjectionAttributes(); if (projs != null) { - for (Iterator itr = projs.iterator(); itr.hasNext();) { - Object[] rawProj = (Object[]) itr.next(); + for (Object proj1 : projs) { + Object[] rawProj = (Object[]) proj1; CompiledValue proj = (CompiledValue) rawProj[1]; proj.getRegionsInQuery(regions, parameters); if (!regions.isEmpty()) { @@ -728,10 +706,9 @@ public class DefaultQuery implements Query { } } // check the orderByAttrs, must not reference any regions - List orderBys = select.getOrderByAttrs(); + List<CompiledSortCriterion> orderBys = select.getOrderByAttrs(); if (orderBys != null) { - for (Iterator itr = orderBys.iterator(); itr.hasNext();) { - CompiledValue orderBy = (CompiledValue) itr.next(); + for (CompiledSortCriterion orderBy : orderBys) { orderBy.getRegionsInQuery(regions, parameters); if (!regions.isEmpty()) { throw new UnsupportedOperationException( @@ -747,43 +724,43 @@ public class DefaultQuery implements Query { } private void updateStatistics(long executionTime) { - numExecutions.incrementAndGet(); - totalExecutionTime.addAndGet(executionTime); - ((GemFireCacheImpl) this.cache).getCachePerfStats().endQueryExecution(executionTime); + this.numExecutions.incrementAndGet(); + this.totalExecutionTime.addAndGet(executionTime); + this.cache.getCachePerfStats().endQueryExecution(executionTime); } // TODO: Implement the function. Toggle the isCompiled flag accordingly - + @Override public void compile() throws TypeMismatchException, NameResolutionException { throw new UnsupportedOperationException( LocalizedStrings.DefaultQuery_NOT_YET_IMPLEMENTED.toLocalizedString()); } - + @Override public boolean isCompiled() { - return this.isCompiled; + return false; } - public boolean isTraced() { - return traceOn; + return this.traceOn; } - class DefaultQueryStatistics implements QueryStatistics { /** * Returns the total amount of time (in nanoseconds) spent executing the query. */ + @Override public long getTotalExecutionTime() { - return totalExecutionTime.get(); + return DefaultQuery.this.totalExecutionTime.get(); } /** * Returns the total number of times the query has been executed. */ + @Override public long getNumExecutions() { - return numExecutions.get(); + return DefaultQuery.this.numExecutions.get(); } } @@ -798,7 +775,7 @@ public class DefaultQuery implements Query { */ public Set getRegionsInQuery(Object[] parameters) { Set regions = new HashSet(); - compiledQuery.getRegionsInQuery(regions, parameters); + this.compiledQuery.getRegionsInQuery(regions, parameters); return Collections.unmodifiableSet(regions); } @@ -818,13 +795,8 @@ public class DefaultQuery implements Query { } /** - * - * @return int idenitifying the limit. A value of -1 indicates that no limit is imposed or the + * @return int identifying the limit. A value of -1 indicates that no limit is imposed or the * query is not a select query - * @throws QueryInvocationTargetException - * @throws NameResolutionException - * @throws TypeMismatchException - * @throws FunctionDomainException */ public int getLimit(Object[] bindArguments) throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { @@ -832,7 +804,7 @@ public class DefaultQuery implements Query { ? ((CompiledSelect) this.compiledQuery).getLimitValue(bindArguments) : -1; } - public void setServerProxy(ServerProxy serverProxy) { + void setServerProxy(ServerProxy serverProxy) { this.serverProxy = serverProxy; } @@ -845,19 +817,22 @@ public class DefaultQuery implements Query { } public CacheRuntimeException getQueryCanceledException() { - return canceledException; + return this.canceledException; } - public boolean[] getQueryCompletedForMonitoring() { + boolean[] getQueryCompletedForMonitoring() { return this.queryCompletedForMonitoring; } - public void setQueryCompletedForMonitoring(boolean value) { + // TODO: parameter value is always true + void setQueryCompletedForMonitoring(boolean value) { this.queryCompletedForMonitoring[0] = value; } /** * The query gets canceled by the QueryMonitor with the reason being specified + * <p> + * TODO: parameter isCanceled is always true */ public void setCanceled(boolean isCanceled, CacheRuntimeException canceledException) { this.isCanceled = isCanceled; @@ -888,18 +863,18 @@ public class DefaultQuery implements Query { return this.cqQuery; } - + @Override public String toString() { - StringBuffer tempBuff = new StringBuffer("Query String = "); - tempBuff.append(this.queryString); - tempBuff.append(';'); - tempBuff.append("isCancelled = "); - tempBuff.append(this.isCanceled); - tempBuff.append("; Total Executions = "); - tempBuff.append(this.numExecutions); - tempBuff.append("; Total Execution Time = "); - tempBuff.append(this.totalExecutionTime); - return tempBuff.toString(); + StringBuilder sb = new StringBuilder("Query String = "); + sb.append(this.queryString); + sb.append(';'); + sb.append("isCancelled = "); + sb.append(this.isCanceled); + sb.append("; Total Executions = "); + sb.append(this.numExecutions); + sb.append("; Total Execution Time = "); + sb.append(this.totalExecutionTime); + return sb.toString(); } void setProxyCache(ProxyCache proxyCache) { @@ -913,82 +888,78 @@ public class DefaultQuery implements Query { DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME = val; } - public static String getLogMessage(QueryObserver observer, long startTime, int resultSize, + private static String getLogMessage(QueryObserver observer, long startTime, int resultSize, String query) { - String usedIndexesString = null; - String rowCountString = null; - float time = 0.0f; - - time = (NanoTimer.getTime() - startTime) / 1.0e6f; + float time = (NanoTimer.getTime() - startTime) / 1.0e6f; + String usedIndexesString = null; if (observer != null && observer instanceof IndexTrackingQueryObserver) { IndexTrackingQueryObserver indexObserver = (IndexTrackingQueryObserver) observer; Map usedIndexes = indexObserver.getUsedIndexes(); indexObserver.reset(); - StringBuffer buf = new StringBuffer(); - buf.append(" indexesUsed("); - buf.append(usedIndexes.size()); - buf.append(")"); + StringBuilder sb = new StringBuilder(); + sb.append(" indexesUsed("); + sb.append(usedIndexes.size()); + sb.append(')'); if (usedIndexes.size() > 0) { - buf.append(":"); + sb.append(':'); for (Iterator itr = usedIndexes.entrySet().iterator(); itr.hasNext();) { Map.Entry entry = (Map.Entry) itr.next(); - buf.append(entry.getKey().toString() + entry.getValue()); + sb.append(entry.getKey()).append(entry.getValue()); if (itr.hasNext()) { - buf.append(","); + sb.append(','); } } } - usedIndexesString = buf.toString(); + usedIndexesString = sb.toString(); } else if (DefaultQuery.QUERY_VERBOSE) { usedIndexesString = " indexesUsed(NA due to other observer in the way: " - + observer.getClass().getName() + ")"; + + observer.getClass().getName() + ')'; } + String rowCountString = null; if (resultSize != -1) { - rowCountString = " rowCount = " + resultSize + ";"; + rowCountString = " rowCount = " + resultSize + ';'; } return "Query Executed in " + time + " ms;" + (rowCountString != null ? rowCountString : "") - + (usedIndexesString != null ? usedIndexesString : "") + " \"" + query + "\""; + + (usedIndexesString != null ? usedIndexesString : "") + " \"" + query + '"'; } - public static String getLogMessage(IndexTrackingQueryObserver indexObserver, long startTime, + private static String getLogMessage(IndexTrackingQueryObserver indexObserver, long startTime, String otherObserver, int resultSize, String query, BucketRegion bucket) { - String usedIndexesString = null; - String rowCountString = null; float time = 0.0f; if (startTime > 0L) { time = (NanoTimer.getTime() - startTime) / 1.0e6f; } + String usedIndexesString = null; if (indexObserver != null) { Map usedIndexes = indexObserver.getUsedIndexes(bucket.getFullPath()); - StringBuffer buf = new StringBuffer(); - buf.append(" indexesUsed("); - buf.append(usedIndexes.size()); - buf.append(")"); - if (usedIndexes.size() > 0) { - buf.append(":"); + StringBuilder sb = new StringBuilder(); + sb.append(" indexesUsed("); + sb.append(usedIndexes.size()); + sb.append(')'); + if (!usedIndexes.isEmpty()) { + sb.append(':'); for (Iterator itr = usedIndexes.entrySet().iterator(); itr.hasNext();) { Map.Entry entry = (Map.Entry) itr.next(); - buf.append(entry.getKey().toString() + "(Results: " + entry.getValue() + ", Bucket: " - + bucket.getId() + ")"); + sb.append(entry.getKey()).append("(Results: ").append(entry.getValue()) + .append(", Bucket: ").append(bucket.getId()).append(")"); if (itr.hasNext()) { - buf.append(","); + sb.append(','); } } } - usedIndexesString = buf.toString(); + usedIndexesString = sb.toString(); } else if (DefaultQuery.QUERY_VERBOSE) { usedIndexesString = - " indexesUsed(NA due to other observer in the way: " + otherObserver + ")"; + " indexesUsed(NA due to other observer in the way: " + otherObserver + ')'; } - rowCountString = " rowCount = " + resultSize + ";"; - return "Query Executed" + (startTime > 0L ? " in " + time + " ms;" : ";") - + (rowCountString != null ? rowCountString : "") - + (usedIndexesString != null ? usedIndexesString : "") + " \"" + query + "\""; + String rowCountString = " rowCount = " + resultSize + ';'; + return "Query Executed" + (startTime > 0L ? " in " + time + " ms;" : ";") + rowCountString + + (usedIndexesString != null ? usedIndexesString : "") + " \"" + query + '"'; } @Override @@ -998,10 +969,9 @@ public class DefaultQuery implements Query { } @Override - public Object execute(RegionFunctionContext context, Object[] parameters) + public Object execute(RegionFunctionContext context, Object[] params) throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { - Object result = null; // Supported only with RegionFunctionContext if (context == null) { @@ -1010,7 +980,7 @@ public class DefaultQuery implements Query { } this.isQueryWithFunctionContext = true; - if (parameters == null) { + if (params == null) { throw new IllegalArgumentException( LocalizedStrings.DefaultQuery_PARAMETERS_CANNOT_BE_NULL.toLocalizedString()); } @@ -1021,16 +991,16 @@ public class DefaultQuery implements Query { } QueryObserver indexObserver = null; - QueryExecutor qe = checkQueryOnPR(parameters); + QueryExecutor qe = checkQueryOnPR(params); + Object result = null; try { indexObserver = startTrace(); if (qe != null) { - Set buckets = null; LocalDataSet localDataSet = (LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context); - buckets = (localDataSet).getBucketSet(); - result = qe.executeQuery(this, parameters, buckets); + Set<Integer> buckets = localDataSet.getBucketSet(); + result = qe.executeQuery(this, params, buckets); return result; } else { // Not supported on regions other than PartitionRegion. @@ -1038,8 +1008,6 @@ public class DefaultQuery implements Query { LocalizedStrings.DefaultQuery_API_ONLY_FOR_PR.toLocalizedString()); } - // } catch (QueryExecutionCanceledException e) { - // throw canceledException; } finally { this.endTrace(indexObserver, startTime, result); } @@ -1080,7 +1048,7 @@ public class DefaultQuery implements Query { } String queryVerboseMsg = - DefaultQuery.getLogMessage(indexObserver, startTime, resultSize, queryString); + DefaultQuery.getLogMessage(indexObserver, startTime, resultSize, this.queryString); this.cache.getLogger().info(queryVerboseMsg); } } @@ -1089,13 +1057,12 @@ public class DefaultQuery implements Query { if (this.cache != null && this.cache.getLogger().infoEnabled() && this.traceOn) { int resultSize = 0; - Iterator<Collection> iterator = result.iterator(); - while (iterator.hasNext()) { - resultSize += iterator.next().size(); + for (Collection aResult : result) { + resultSize += aResult.size(); } String queryVerboseMsg = - DefaultQuery.getLogMessage(indexObserver, startTime, resultSize, queryString); + DefaultQuery.getLogMessage(indexObserver, startTime, resultSize, this.queryString); if (this.cache.getLogger().infoEnabled()) { this.cache.getLogger().info(queryVerboseMsg); } @@ -1103,7 +1070,7 @@ public class DefaultQuery implements Query { } public boolean isRemoteQuery() { - return isRemoteQuery; + return this.isRemoteQuery; } public void setRemoteQuery(boolean isRemoteQuery) { @@ -1112,33 +1079,29 @@ public class DefaultQuery implements Query { /** * set keepSerialized flag for remote queries of type 'select *' having independent operators - * - * @param cs - * @param context */ - public void keepResultsSerialized(CompiledSelect cs, ExecutionContext context) { + void keepResultsSerialized(CompiledSelect cs, ExecutionContext context) { if (isRemoteQuery()) { // for dependent iterators, deserialization is required if (cs.getIterators().size() == context.getAllIndependentIteratorsOfCurrentScope().size() && cs.getWhereClause() == null && cs.getProjectionAttributes() == null && !cs.isDistinct() && cs.getOrderByAttrs() == null) { - setKeepSerialized(true); + setKeepSerialized(); } } } public boolean isKeepSerialized() { - return keepSerialized; + return this.keepSerialized; } - private void setKeepSerialized(boolean keepSerialized) { - this.keepSerialized = keepSerialized; + private void setKeepSerialized() { + this.keepSerialized = true; } - public interface TestHook { - public void doTestHook(int spot); + void doTestHook(int spot); - public void doTestHook(String spot); + void doTestHook(String spot); } }
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java index 18fe266..6675e02 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/ExecutionContext.java @@ -14,20 +14,30 @@ */ package org.apache.geode.cache.query.internal; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; -import org.apache.geode.cache.query.*; +import org.apache.geode.cache.query.AmbiguousNameException; +import org.apache.geode.cache.query.NameResolutionException; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.TypeMismatchException; import org.apache.geode.cache.query.internal.index.IndexManager; +import org.apache.geode.cache.query.internal.index.IndexUtils; import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes; -import org.apache.geode.cache.query.internal.types.*; +import org.apache.geode.cache.query.internal.types.TypeUtils; import org.apache.geode.internal.Assert; -import org.apache.geode.cache.query.internal.index.IndexUtils; -import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.BucketRegion; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.CachePerfStats; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.pdx.internal.PdxString; @@ -37,13 +47,14 @@ import org.apache.geode.pdx.internal.PdxString; * clauses or index expressions to from clause iterators. * * @see QueryExecutionContext for extended version of this ONLY for querying. - * */ public class ExecutionContext { - protected Object[] bindArguments; + Object[] bindArguments; + private final Stack scopes = new Stack(); - private final Cache cache; + + private final InternalCache cache; /** * a Sequentially increasing number identifying a scope & also indicating whether a given scope @@ -56,23 +67,22 @@ public class ExecutionContext { * Dependency graph. Maps CompiledValues in tree to the RuntimeIterators each node is dependent * on. This information is computed just before the query is evaluated. The information is good * for only one execution, since regions can be destroyed and re-created with different type - * constraints. Type of this map: map <CompiledValue, set <RuntimeIterator>> + * constraints. Type of this map: map <CompiledValue, set <RuntimeIterator>> */ - Map dependencyGraph = new HashMap(); + private Map dependencyGraph = new HashMap(); + /** * Map which stores the CompiledIteratorDef as the key & the value is the set of Independent * RuntimeIterators on which it is dependent upon. The idea is that this Map will identify the - * final Independent RuntimeIterator or Iterators , ie. those refering to a Region or - * BindArgument, on which the CompiledIteratorDef depends upon . TODO:Asif: For a single vale , - * should we still use a Set? - * + * final Independent RuntimeIterator or Iterators , ie. those referring to a Region or + * BindArgument, on which the CompiledIteratorDef depends upon . */ private final Map itrDefToIndpndtRuntimeItrMap = new HashMap(); + /** - * Asif : This Map will store its Region Path String against an Independent RuntimeIterator An - * entry in this Map will be only for those RuntimeIteartors which have an underlying Region as - * its Collection Expression - * + * This Map will store its Region Path String against an Independent RuntimeIterator An entry in + * this Map will be only for those RuntimeIterators which have an underlying Region as its + * Collection Expression */ private final Map indpndtItrToRgnMap = new HashMap(); @@ -89,23 +99,13 @@ public class ExecutionContext { * * @see org.apache.geode.cache.Region#query */ - public ExecutionContext(Object[] bindArguments, Cache cache) { - this.bindArguments = bindArguments; - this.cache = cache; - } - - public ExecutionContext(Object[] bindArguments, Cache cache, SelectResults results) { - this.bindArguments = bindArguments; - this.cache = cache; - } - - public ExecutionContext(Object[] bindArguments, Cache cache, Query query) { + public ExecutionContext(Object[] bindArguments, InternalCache cache) { this.bindArguments = bindArguments; this.cache = cache; } public CachePerfStats getCachePerfStats() { - return ((GemFireCacheImpl) this.cache).getCachePerfStats(); + return this.cache.getCachePerfStats(); } /** @@ -129,10 +129,8 @@ public class ExecutionContext { return ds; } - // TODO:ASIF:QUERY /** * Return true if given CompiledValue is dependent on any RuntimeIterator in current scope - * */ boolean isDependentOnCurrentScope(CompiledValue cv) { // return !getDependencySet(cv, true).isEmpty(); @@ -170,7 +168,7 @@ public class ExecutionContext { Set set = (Set) this.dependencyGraph.get(cv); if (set == null) { if (readOnly) - return Collections.EMPTY_SET; + return Collections.emptySet(); set = new HashSet(1); this.dependencyGraph.put(cv, set); } @@ -179,7 +177,7 @@ public class ExecutionContext { /** * Returns all dependencies in from this context which are reused during index update by new - * {@link ExecutionContext} for concurrent updates on indexes. + * ExecutionContext for concurrent updates on indexes. * * @return All {@link AbstractCompiledValue} dependencies. */ @@ -198,10 +196,8 @@ public class ExecutionContext { return this.bindArguments[index - 1]; } - // TODO:ASIF:Query /** bind a named iterator (to current scope) */ public void bindIterator(RuntimeIterator itr) { - // int currScopeID = this.scopes.size(); QScope currentScope = this.currentScope(); int currScopeID = currentScope.getScopeID(); itr.setScopeID(currScopeID); @@ -212,10 +208,8 @@ public class ExecutionContext { CompiledValue value = resolveAsVariable(name); if (value != null) return value; - // attribute name or operation name (no args) of a variable in the current - // scope - // when there is no ambiguity, i.e. this property name belongs to only one - // variable in the scope + // attribute name or operation name (no args) of a variable in the current scope when there is + // no ambiguity, i.e. this property name belongs to only one variable in the scope value = resolveImplicitPath(name); if (value == null) // cannot be resolved @@ -246,11 +240,9 @@ public class ExecutionContext { } /** - * - * @return int indentifying the scope ID which can be assosciated with the scope + * @return the scope ID which can be associated with the scope */ - int assosciateScopeID() { - // this.scopeIDMap.put(cs, Integer.valueOf(num)); + int associateScopeID() { return ++this.scopeNum; } @@ -271,11 +263,10 @@ public class ExecutionContext { * argument . Also the self independent Runtime Iterator present in the scope ( that is teh * RuntimeIterator same as the independent iterator passed as argument) is added at start of the * list. If an iterator is dependent on more than one independent iterator, it is not added to the - * List TODO:Asif If we are storing a single Iterator instead of Set , in the - * itrDefToIndpndtRuntimeItrMap , we need to take care of this function. - * - * <P> - * author Asif + * List + * <p> + * TODO: If we are storing a single Iterator instead of Set , in the itrDefToIndpndtRuntimeItrMap + * , we need to take care of this function. * * @param rIter Independent RuntimeIterator on which dependent iterators of current scope need to * identified @@ -297,27 +288,13 @@ public class ExecutionContext { return list; } - public List getAllIterators() { - int numScopes = scopes.size(); - List iterators = new ArrayList(); - for (int i = 1; i <= numScopes; i++) { - iterators.addAll(((QScope) scopes.get(numScopes - i)).getIterators()); - } - return iterators; - } - void setOneIndexLookup(boolean b) { QScope scope = currentScope(); Support.Assert(scope != null, "must be called within valid scope"); scope._oneIndexLookup = b; } - - void setCurrent(RuntimeIterator iter, Object obj) { - currentScope().setCurrent(iter, obj); - } - - public Cache getCache() { + public InternalCache getCache() { return this.cache; } @@ -336,7 +313,6 @@ public class ExecutionContext { */ RuntimeIterator resolveImplicitOperationName(String name, int numArgs, boolean mustBeMethod) throws AmbiguousNameException { - // System.out.println("In resolveImplicitOperationName"); // iterate through all properties of iterator variables in scope // to see if there is a unique resolution RuntimeIterator oneUnknown = null; @@ -353,8 +329,8 @@ public class ExecutionContext { if (scope.getLimit() == itr) { continue NEXT_SCOPE; // don't go any farther in this scope } - // Shobhit: If Element type is ObjectType then we don't need to - // apply reflection to find out field or method. This save lot of CPU time. + // If Element type is ObjectType then we don't need to apply reflection to find out field or + // method. This save lot of CPU time. if (!TypeUtils.OBJECT_TYPE.equals(itr.getElementType()) && itr.containsProperty(name, numArgs, mustBeMethod)) { hits.add(itr); @@ -368,14 +344,15 @@ public class ExecutionContext { } } } - if (hits.size() == 1) + if (hits.size() == 1) { return (RuntimeIterator) hits.get(0); + } if (hits.size() > 1) { // ambiguous if (mustBeMethod) throw new AmbiguousNameException( LocalizedStrings.ExecutionContext_METHOD_NAMED_0_WITH_1_ARGUMENTS_IS_AMBIGUOUS_BECAUSE_IT_CAN_APPLY_TO_MORE_THAN_ONE_VARIABLE_IN_SCOPE - .toLocalizedString(new Object[] {name, Integer.valueOf(numArgs)})); + .toLocalizedString(name, numArgs)); throw new AmbiguousNameException( LocalizedStrings.ExecutionContext_ATTRIBUTE_NAMED_0_IS_AMBIGUOUS_BECAUSE_IT_CAN_APPLY_TO_MORE_THAN_ONE_VARIABLE_IN_SCOPE .toLocalizedString(name)); @@ -387,25 +364,13 @@ public class ExecutionContext { return oneUnknown; } - protected CompiledValue resolveScopeVariable(String name) { - CompiledValue value = null; - for (int i = scopes.size() - 1; i >= 0; i--) { - QScope scope = (QScope) scopes.get(i); - value = scope.resolve(name); - if (value != null) - break; - } - return value; - } - /** * Tries to find for RuntimeIterator associated with specified expression */ public RuntimeIterator findRuntimeIterator(CompiledValue expr) { // Check if expr is itself RuntimeIterator if (expr instanceof RuntimeIterator) { - RuntimeIterator rIter = (RuntimeIterator) expr; - return rIter; + return (RuntimeIterator) expr; } // Try to find RuntimeIterator return (RuntimeIterator) findIterator(expr); @@ -427,9 +392,8 @@ public class ExecutionContext { CompiledOperation operation = (CompiledOperation) path; CompiledValue rec = operation.getReceiver(this); if (rec == null) { - RuntimeIterator rcvrItr = resolveImplicitOperationName(operation.getMethodName(), + return resolveImplicitOperationName(operation.getMethodName(), operation.getArguments().size(), true); - return rcvrItr; } return findIterator(rec); } @@ -442,44 +406,29 @@ public class ExecutionContext { CompiledValue expr = resolve(((CompiledID) path).getId()); return findIterator(expr); } // if we get these exceptions return null - } catch (TypeMismatchException e) { - } catch (NameResolutionException e) { + } catch (TypeMismatchException | NameResolutionException ignore) { } return null; } - int getScopeCount() { - return this.scopes.size(); - } - /** - * * Calculates set of Runtime Iterators on which a given CompiledValue ultimately depends. The * independent iterators may belong to other scopes. - * - * <P> - * author Asif/Ketan - * - * @param cv - * @param set + * <p> + * This function will populate the set to its independent RuntimeIterators. However if the + * CompiledValue happens to be a CompiledIteratorDef & if it is independent of any other + * RuntimeIterators then no addition will be done in the Set. + * <p> + * TODO: the behavior of this function will change if we modify the computeDependency function of + * the CompiledIteratorDef as in that case the Set will be added with the self RuntimeIterator ( + * if the CompiledIteratorDef is independent) which is not the case now. + * <p> + * TODO: If a CompiledIteratorDef has only one dependent RuntimeIterator should it still be stored + * in a Set or should it be a single value? */ - // Ketan - Asif:This function will populate the set to its independent - // RuntimeIterators - // However if the CompiledValue happens to be a CompiledIteratorDef & if it is - // independent of any other RuntimeIterators then no adition will be done in - // the Set - // TODO: Asif : The behaviour of this function will change if we modify the - // computeDependency - // function of the CompiledIteratorDef as in that case the Set will be added - // with the self RuntimeIterator ( if the CompiledIteratorDef is independent) - // which is - // not the case now - // TODO:Asif : If a CompiledIteratorDef has only one dependent RuntimeIterator - // should it still be - // stored in a Set or should it be a single value? - public void computeUtlimateDependencies(CompiledValue cv, Set set) { + void computeUltimateDependencies(CompiledValue cv, Set set) { Set dependencySet = this.getDependencySet(cv, true /* readOnly */); - if (dependencySet != Collections.EMPTY_SET) { + if (dependencySet != Collections.emptySet()) { Iterator iter = dependencySet.iterator(); RuntimeIterator rIter; while (iter.hasNext()) { @@ -494,29 +443,25 @@ public class ExecutionContext { } /** - * Asif : This function populates the Map itrDefToIndpndtRuntimeItrMap. It creates a Set of + * This function populates the Map itrDefToIndpndtRuntimeItrMap. It creates a Set of * RuntimeIterators to which the current CompilediteratorDef is dependent upon. Also it sets the * index_internal_id for the RuntimeIterator, which is used for calculating the canonicalized * iterator definitions for identifying the available index. * * @param itrDef CompiledIteratorDef object representing iterator in the query from clause - * @throws AmbiguousNameException - * @throws TypeMismatchException */ public void addToIndependentRuntimeItrMap(CompiledIteratorDef itrDef) throws AmbiguousNameException, TypeMismatchException, NameResolutionException { Set set = new HashSet(); - this.computeUtlimateDependencies(itrDef, set); + this.computeUltimateDependencies(itrDef, set); RuntimeIterator itr = null; String rgnPath = null; // If the set is empty then add the self RuntimeIterator to the Map. if (set.isEmpty()) { itr = itrDef.getRuntimeIterator(this); set.add(itr); - // Asif : Since it is a an independent RuntimeIterator , check if its - // Collection Expr - // boils down to a Region. If it is , we need to store the QRegion in the - // Map + // Since it is a an independent RuntimeIterator , check if its Collection Expr boils down to a + // Region. If it is , we need to store the QRegion in the Map CompiledValue startVal = QueryUtils.obtainTheBottomMostCompiledValue(itrDef.getCollectionExpr()); if (startVal.getType() == OQLLexerTokenTypes.RegionPath) { @@ -532,12 +477,10 @@ public class ExecutionContext { } this.itrDefToIndpndtRuntimeItrMap.put(itrDef, set); IndexManager mgr = null; - // Asif : Set the canonicalized index_internal_id if the condition is - // satisfied + // Set the canonicalized index_internal_id if the condition is satisfied if (set.size() == 1) { if (itr == null) { itr = (RuntimeIterator) set.iterator().next(); - // if (itr.getScopeID() == this.getScopeCount()) { if (itr.getScopeID() == this.currentScope().getScopeID()) { rgnPath = (String) this.indpndtItrToRgnMap.get(itr); } @@ -556,7 +499,6 @@ public class ExecutionContext { currItr.setIndexInternalID((mgr == null || (tempIndexID = mgr.getCanonicalizedIteratorName(itrDef.genFromClause(this))) == null) ? currItr.getInternalId() : tempIndexID); - } public List getAllIndependentIteratorsOfCurrentScope() { @@ -573,12 +515,11 @@ public class ExecutionContext { } /** - * Asif : This method returns the Region path for the independent RuntimeIterator if itr exists - * else returns null. It is the caller's responsibility to ensure that the passed Iterator is the + * This method returns the Region path for the independent RuntimeIterator if itr exists else + * returns null. It is the caller's responsibility to ensure that the passed Iterator is the * ultimate Independent Runtime Iterator or else the method may return null if the RunTimeIterator * is genuinely dependent on a Region iterator * - * @param riter * @return String containing region path */ String getRegionPathForIndependentRuntimeIterator(RuntimeIterator riter) { @@ -588,22 +529,15 @@ public class ExecutionContext { /** * Populates the independent runtime iterator map for index creation purposes. This method does * not create any canonicalized index ids etc. - * <p> - * author Asif - * - * @param itrDef - * @throws AmbiguousNameException - * @throws TypeMismatchException */ public void addToIndependentRuntimeItrMapForIndexCreation(CompiledIteratorDef itrDef) throws AmbiguousNameException, TypeMismatchException, NameResolutionException { Set set = new HashSet(); - this.computeUtlimateDependencies(itrDef, set); - RuntimeIterator itr = null; + this.computeUltimateDependencies(itrDef, set); // If the set is empty then add the self RuntimeIterator to the Map. if (set.isEmpty()) { - itr = itrDef.getRuntimeIterator(this); + RuntimeIterator itr = itrDef.getRuntimeIterator(this); set.add(itr); } this.itrDefToIndpndtRuntimeItrMap.put(itrDef, set); @@ -637,11 +571,7 @@ public class ExecutionContext { return this.pr; } - // General purpose caching methods for data that is only valid for one - // query execution - void cachePut(Object key, Object value) { - // throw new UnsupportedOperationException("Method should not have been called"); - } + void cachePut(Object key, Object value) {} public Object cacheGet(Object key) { return null; @@ -683,14 +613,6 @@ public class ExecutionContext { throw new UnsupportedOperationException("Method should not have been called"); } - public void addToSuccessfulBuckets(int bId) { - throw new UnsupportedOperationException("Method should not have been called"); - } - - public int[] getSuccessfulBuckets() { - throw new UnsupportedOperationException("Method should not have been called"); - } - public PdxString getSavedPdxString(int index) { throw new UnsupportedOperationException("Method should not have been called"); } http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexConditioningHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexConditioningHelper.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexConditioningHelper.java new file mode 100644 index 0000000..75ce930 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexConditioningHelper.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.internal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.apache.geode.cache.query.types.StructType; + +/** + * This is a helper class which provides information on how an index data be used so as to make it + * compatible with the query. + */ +class IndexConditioningHelper { + + /** + * boolean if true implies that the index results need to be iterated so as to make it compatible + * with from clause. Shuffling may be needed for any of the following reasons: 1) Match level not + * zero ( implying index result expansion or cutdown) 2) Match level zero , but the relative + * positions of iterators in the List of iterators for the group not matching the positions in the + * index result StructBag 3) Iter operand is not null. * + * + */ + // If shuffling is not needed , then it can be bcoz of two reasons + // 1) The Index results is a ResultSet & match level is zero ( in that case we + // don't have to do anything) + // 2) The Index results is a StructBag with match level as zero & inddex + // fields matching + // the order of RuntimeIterators. In that case we just have to change the + // StructType of the StructBag + boolean shufflingNeeded = true; + + /** + * An arary of RuntimeIterators whose size is equal to the number of fields in the Index results. + * It identifies the RuntimeIterator for the field in the Index Results. Thus the Runtime Iterator + * at position 0 will be that for field 0 in the index result & so on. For those index fields + * which do not have a Runtime Iterator assosciated , the value is null (This is the case if index + * results require cut down) + */ + RuntimeIterator[] indexFieldToItrsMapping = null; + + /** + * The List containing RuntimeIterators to which the index results need to be expanded This will + * usually be Final List of RuntimeIterators - RuntimeIteratosr already accounted for in the index + * results + */ + // The default is initialized as empty List rather than null to avoid + // Null Pointer Exception in the function + // getconditionedRelationshipIndexResults + List expansionList = Collections.emptyList(); + + /** + * The List containing RuntimeIterators which define the final SelectResults after the relevant + * expansion/cutdown of index results + */ + // Though in case of single index usage , if no shuffling is needed ( + // exact match) we + // do not need finalList , but it is used in relation ship index , even if + // match level is zero. + // So we should never leave it as null + List finalList = null; + + /** + * This is the List of RuntimeIterators which gets created only if the index resulst require a + * cutdown. In such cases , it identifies those Runtime Iterators of Index Results which will be + * selected to form the result tuple. The RuntimeIterators in this List will have corresponding + * fields in the resultset obtained from Index usage. This List will be populated only if there + * exists fields in index resultset which will not be selected.If all the fields of index + * resultset will be used , then this List should be null or empty. It is used in preventing + * unnecessary expansion of same type, when a similar expansion has already occured. as for eg + * + * consider a index result containing 3 fields field1 field2 & field3 . Assume that field3 is for + * cutdown. Since the expansion iterators can either be independent of all the fields in the index + * result or at the max be dependent on field1 & field2, we should expand for a given combination + * of field1 & field2 , only once ( as we have resulst as Set, we can only have unique entries) + * ie. suppose a index result tuple has values ( 1,2 , 3 ) & ( 1,2,4) , we should expand only once + * ( as field with value 3 & 4 are to be discarded). + */ + /* + * Below Can be null or empty collections if the match level is exact & no shuffling needed + */ + List checkList = null; + + /** + * This field is meaningful iff the match level is zero, no shuffling needed & there exists a + * StructBag (& not a ResultBag) + */ + StructType structType = null; + + /** + * Independent Iterator for the Group to which the Path expression belongs to + */ + RuntimeIterator indpndntItr = null; + + /** + * Indexnfo object for the path expression + */ + IndexInfo indxInfo = null; + + IndexConditioningHelper(IndexInfo indexInfo, ExecutionContext context, int indexFieldsSize, + boolean completeExpansion, CompiledValue iterOperands, RuntimeIterator grpIndpndntItr) { + /* + * First obtain the match level of index resultset. If the match level happens to be zero , this + * implies that we just have to change the StructType ( again if only the Index resultset is a + * StructBag). If the match level is zero & expand to to top level flag is true & iff the total + * no. of iterators in current scope is greater than the no. of fields in StructBag , then only + * we need to do any expansion. The grpIndpndtItr passed can be null if the where clause + * comprises of just this condition. However if it is invoked from GroupJunction , it will be + * not null + * + */ + this.indxInfo = indexInfo; + List grpItrs = null; + int size = indexInfo.mapping.length; + this.indpndntItr = grpIndpndntItr; + this.indexFieldToItrsMapping = new RuntimeIterator[indexFieldsSize]; + // Obtain the grpIndpndt iterator if it is passed as null + if (this.indpndntItr == null) { + Set set1 = new HashSet(); + context.computeUltimateDependencies(indexInfo._path, set1); + Support.Assert(set1.size() == 1, + " Since we are in Indexed Evaluate that means there has to be exactly one independent iterator for this compiled comparison"); + // The ultimate independent RuntimeIterator + this.indpndntItr = (RuntimeIterator) set1.iterator().next(); + Support.Assert( + this.indpndntItr.getScopeID() == context.currentScope() + .getScopeID()/* context.getScopeCount() */, + " Since we are in Indexed Evaluate that means the current scope count & indpenedent iterator's scope count should match"); + } + if (indexInfo._matchLevel == 0 + && (!completeExpansion || context.getCurrentIterators().size() == size)) { + // Don't do anything , just change the StructType if the set is + // structset. + if (size > 1) { + // The Index resultset is a structType. + Support.Assert(indexInfo._index.getResultSetType() instanceof StructType, + " If the match level is zero & the size of mapping array is 1 then Index is surely ResultBag else StructBag"); + // The independent iterator is added as the first element + grpItrs = context.getCurrScopeDpndntItrsBasedOnSingleIndpndntItr(this.indpndntItr); + // Check if reshuffling is needed or just changing the struct + // type will suffice + boolean isReshufflingNeeded = false; + int pos = -1; + for (int i = 0; i < size; ++i) { + pos = indexInfo.mapping[i]; + isReshufflingNeeded = isReshufflingNeeded || (pos != (i + 1)); + this.indexFieldToItrsMapping[pos - 1] = (RuntimeIterator) grpItrs.get(i); + } + this.finalList = grpItrs; + // Even if Reshuffle is not need but if the iter conditions are + // present we need to do evaluation + // We can avoid iterating over the set iff reshuffling is not needed & + // there is no iter eval condition + if (isReshufflingNeeded || iterOperands != null) { + // this.expansionList = Collections.EMPTY_LIST; + this.checkList = null; + // indexReults = QueryUtils.cutDownAndExpandIndexResults(indexReults, + // indexFieldToItrsMapping, Collections.EMPTY_LIST, grpItrs, + // context, Collections.EMPTY_LIST, iterOperands); + } else { + this.structType = QueryUtils.createStructTypeForRuntimeIterators(grpItrs); + // indexReults.setElementType(structType); + // Shuffling is not needed. Index results is a StructBag + // with match level zero & no expansion needed & index fields map + // with the RuntimeIterators. But we need to change the StructType + // of the StructBag + this.shufflingNeeded = false; + } + } else { + // The finalList should not be left uninitialized, & if the match + // level is zero + // & the Index Results is a ResultBag ( & not an StructBag ) implying + // indexFieldsSize of + // 1 , then the final List should contain only the independent iterator + this.finalList = new ArrayList(); + this.finalList.add(this.indpndntItr); + Support.Assert(this.indexFieldToItrsMapping.length == 1, + "In this else block , it should be guaranteed that there exists only one iterator in query as well as index from clause & that should be nothing but the independent RuntimeIterator of the group "); + this.indexFieldToItrsMapping[0] = this.indpndntItr; + // Shuffling is needed if iter operand is not null even if index results is a + // ResultSet + // with match level zero & no expansion needed + this.shufflingNeeded = (iterOperands != null); + } + } else { + // There is some expansion or truncation needed on the data + // obtained from index.Identify a the iterators belonging to this group + // The independent iterator is added as the first element + grpItrs = context.getCurrScopeDpndntItrsBasedOnSingleIndpndntItr(this.indpndntItr); + // Create an array of RuntimeIterators which map to the fields of the + // Index set. + // For those fields which do not have corresponding RuntimeIterator , keep + // it as null; + int pos = -1; + this.finalList = completeExpansion ? context.getCurrentIterators() : grpItrs; + // This is the List of runtimeIterators which have corresponding fields + // in the resultset obtained from Index usage. This List will be populated + // only if there exists fields in index resultset which will not be + // selected + // If all the fields of index resultset will be used , then this List + // should + // be null or empty + this.checkList = new ArrayList(); + // This List contains the RuntimeIterators which are missing from + // index resultset but are present in the final iterators + this.expansionList = new LinkedList(finalList); + RuntimeIterator tempItr = null; + // boolean cutDownNeeded = false; + int unMappedFields = indexFieldsSize; + for (int i = 0; i < size; ++i) { + pos = indexInfo.mapping[i]; + if (pos > 0) { + tempItr = (RuntimeIterator) grpItrs.get(i); + this.indexFieldToItrsMapping[pos - 1] = tempItr; + this.expansionList.remove(tempItr); + this.checkList.add(tempItr); + --unMappedFields; + } + } + boolean cutDownNeeded = unMappedFields > 0; + if (!cutDownNeeded) + this.checkList = null; + /* + * indexReults = QueryUtils.cutDownAndExpandIndexResults(indexReults, indexFieldToItrsMapping, + * expansionList, finalList, context, checkList, iterOperands); + */ + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexCutDownExpansionHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexCutDownExpansionHelper.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexCutDownExpansionHelper.java new file mode 100644 index 0000000..d5514a5 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexCutDownExpansionHelper.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.internal; + +import java.util.List; + +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.Struct; +import org.apache.geode.cache.query.internal.types.StructTypeImpl; +import org.apache.geode.cache.query.types.ObjectType; + +/** + * This is a helper class which contains informaion on how to expand / cutdown index results for + * making it compatible with the query. + */ +class IndexCutDownExpansionHelper { + + /** + * booelan which identifies if a cutdown of index results is needed or not. + */ + boolean cutDownNeeded = false; + + /** + * A SelectResults ( ResultBag or StructBag) object used to prevent unnecessary expansion of index + * results as described in IndexConditionalHelper class. + */ + SelectResults checkSet = null; + + /** + * ObjectType for the checkSet object ( An ObjectType for a ResultBag & StructType for a + * StructBag) + */ + ObjectType checkType = null; + + int checkSize = -1; + + IndexCutDownExpansionHelper(List checkList, ExecutionContext context) { + cutDownNeeded = checkList != null && (checkSize = checkList.size()) > 0; + if (cutDownNeeded) { + Boolean orderByClause = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_ORDER_BY_AT_INDEX); + boolean useLinkedDataStructure = false; + boolean nullValuesAtStart = true; + if (orderByClause != null && orderByClause) { + List orderByAttrs = (List) context.cacheGet(CompiledValue.ORDERBY_ATTRIB); + useLinkedDataStructure = orderByAttrs.size() == 1; + nullValuesAtStart = !((CompiledSortCriterion) orderByAttrs.get(0)).getCriterion(); + } + if (checkSize > 1) { + + checkType = QueryUtils.createStructTypeForRuntimeIterators(checkList); + if (useLinkedDataStructure) { + checkSet = context.isDistinct() ? new LinkedStructSet((StructTypeImpl) checkType) + : new SortedResultsBag<Struct>((StructTypeImpl) checkType, nullValuesAtStart); + } else { + checkSet = QueryUtils.createStructCollection(context, (StructTypeImpl) checkType); + } + } else { + checkType = ((RuntimeIterator) checkList.get(0)).getElementType(); + if (useLinkedDataStructure) { + checkSet = context.isDistinct() ? new LinkedResultSet(checkType) + : new SortedResultsBag(checkType, nullValuesAtStart); + } else { + checkSet = QueryUtils.createResultCollection(context, checkType); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java index 7d029a1..696e501 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryExecutionContext.java @@ -14,17 +14,14 @@ */ package org.apache.geode.cache.query.internal; -import it.unimi.dsi.fastutil.ints.IntOpenHashSet; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Stack; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.query.Query; -import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.pdx.internal.PdxString; /** @@ -37,11 +34,10 @@ import org.apache.geode.pdx.internal.PdxString; public class QueryExecutionContext extends ExecutionContext { private int nextFieldNum = 0; - private Query query; - private IntOpenHashSet successfulBuckets; - private boolean cqQueryContext = false; + private final Query query; + private boolean cqQueryContext = false; private List bucketList; @@ -66,32 +62,19 @@ public class QueryExecutionContext extends ExecutionContext { /** * List of query index names that the user has hinted on using */ + private ArrayList hints = null; - private ArrayList<String> hints = null; - - /** - * @param bindArguments - * @param cache - */ - public QueryExecutionContext(Object[] bindArguments, Cache cache) { + public QueryExecutionContext(Object[] bindArguments, InternalCache cache) { super(bindArguments, cache); + this.query = null; } - - - /** - * @param bindArguments - * @param cache - * @param query - */ - public QueryExecutionContext(Object[] bindArguments, Cache cache, Query query) { + public QueryExecutionContext(Object[] bindArguments, InternalCache cache, Query query) { super(bindArguments, cache); this.query = query; } - - // General purpose caching methods for data that is only valid for one - // query execution + @Override void cachePut(Object key, Object value) { if (key.equals(CompiledValue.QUERY_INDEX_HINTS)) { setHints((ArrayList) value); @@ -111,10 +94,12 @@ public class QueryExecutionContext extends ExecutionContext { execCache.put(key, value); } + @Override public Object cacheGet(Object key) { return cacheGet(key, null); } + @Override public Object cacheGet(Object key, Object defaultValue) { // execCache can be empty in cases where we are doing adds to indexes // in that case, we use a default execCache @@ -132,10 +117,12 @@ public class QueryExecutionContext extends ExecutionContext { return defaultValue; } + @Override public void pushExecCache(int scopeNum) { execCacheStack.push(scopeNum); } + @Override public void popExecCache() { execCacheStack.pop(); } @@ -143,51 +130,49 @@ public class QueryExecutionContext extends ExecutionContext { /** * Added to reset the state from the last execution. This is added for CQs only. */ + @Override public void reset() { super.reset(); this.execCacheStack.clear(); } + @Override int nextFieldNum() { return this.nextFieldNum++; } + @Override public void setCqQueryContext(boolean cqQuery) { this.cqQueryContext = cqQuery; } + @Override public boolean isCqQueryContext() { return this.cqQueryContext; } - + @Override public Query getQuery() { return query; } + @Override public void setBucketList(List list) { this.bucketList = list; - this.successfulBuckets = new IntOpenHashSet(); } + @Override public List getBucketList() { return this.bucketList; } - public void addToSuccessfulBuckets(int bId) { - this.successfulBuckets.add(bId); - } - - public int[] getSuccessfulBuckets() { - return this.successfulBuckets.toIntArray(); - } - /** * creates new PdxString from String and caches it */ + @Override public PdxString getSavedPdxString(int index) { if (bindArgumentToPdxStringMap == null) { - bindArgumentToPdxStringMap = new HashMap<Integer, PdxString>(); + bindArgumentToPdxStringMap = new HashMap<>(); } PdxString pdxString = bindArgumentToPdxStringMap.get(index - 1); @@ -196,7 +181,6 @@ public class QueryExecutionContext extends ExecutionContext { bindArgumentToPdxStringMap.put(index - 1, pdxString); } return pdxString; - } public boolean isIndexUsed() { @@ -207,8 +191,8 @@ public class QueryExecutionContext extends ExecutionContext { this.indexUsed = indexUsed; } - public void setHints(ArrayList<String> hints) { - this.hints = new ArrayList(); + private void setHints(ArrayList<String> hints) { + this.hints = new ArrayList<>(); this.hints.addAll(hints); } @@ -217,7 +201,7 @@ public class QueryExecutionContext extends ExecutionContext { * @return true if the index name was hinted by the user */ public boolean isHinted(String indexName) { - return hints != null ? hints.contains(indexName) : false; + return hints != null && hints.contains(indexName); } /** @@ -227,11 +211,11 @@ public class QueryExecutionContext extends ExecutionContext { return -(hints.size() - hints.indexOf(indexName)); } - public boolean hasHints() { + boolean hasHints() { return hints != null; } - public boolean hasMultiHints() { + boolean hasMultiHints() { return hints != null && hints.size() > 1; } } http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java index 569fbb0..89885f1 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.cache.query.internal; import java.util.concurrent.ConcurrentHashMap; @@ -50,26 +49,23 @@ public class QueryMonitor implements Runnable { * canceled due to max query execution timeout. TRUE it the query is canceled due to max query * execution timeout timeout. */ - private static ThreadLocal<AtomicBoolean> queryExecutionStatus = - new ThreadLocal<AtomicBoolean>() { - @Override - protected AtomicBoolean initialValue() { - return new AtomicBoolean(Boolean.FALSE); - } - }; + private static final ThreadLocal<AtomicBoolean> queryExecutionStatus = + ThreadLocal.withInitial(() -> new AtomicBoolean(Boolean.FALSE)); private final long maxQueryExecutionTime; private static final ConcurrentLinkedQueue queryThreads = new ConcurrentLinkedQueue(); private Thread monitoringThread; + private final AtomicBoolean stopped = new AtomicBoolean(Boolean.FALSE); - /** For DUnit test purpose */ + /** For DUnit test purpose TODO: delete this ConcurrentMap */ private ConcurrentMap queryMonitorTasks = null; // Variables for cancelling queries due to low memory private volatile static Boolean LOW_MEMORY = Boolean.FALSE; + private volatile static long LOW_MEMORY_USED_BYTES = 0; public QueryMonitor(long maxQueryExecutionTime) { @@ -92,7 +88,7 @@ public class QueryMonitor implements Runnable { QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryExecutionStatus.get()); synchronized (queryThreads) { queryThreads.add(queryTask); - queryThreads.notify(); + queryThreads.notifyAll(); } if (logger.isDebugEnabled()) { @@ -101,7 +97,7 @@ public class QueryMonitor implements Runnable { queryThreads.size(), queryThread.getId(), query.getQueryString(), queryThread); } - /** For dunit test purpose */ + // For dunit test purpose if (GemFireCacheImpl.getInstance() != null && GemFireCacheImpl.getInstance().testMaxQueryExecutionTime > 0) { if (this.queryMonitorTasks == null) { @@ -113,14 +109,12 @@ public class QueryMonitor implements Runnable { /** * Stops monitoring the query. Removes the passed thread from QueryMonitor queue. - * - * @param queryThread */ public void stopMonitoringQueryThread(Thread queryThread, Query query) { // Re-Set the queryExecution status on the LocalThread. QueryExecutionTimeoutException testException = null; - DefaultQuery q = (DefaultQuery) query; - boolean[] queryCompleted = q.getQueryCompletedForMonitoring(); + DefaultQuery defaultQuery = (DefaultQuery) query; + boolean[] queryCompleted = defaultQuery.getQueryCompletedForMonitoring(); synchronized (queryCompleted) { queryExecutionStatus.get().getAndSet(Boolean.FALSE); @@ -137,7 +131,7 @@ public class QueryMonitor implements Runnable { // Its seen that in some cases based on OS thread scheduling the thread can sleep much // longer than the specified time. if (queryTask != null) { - if ((currentTime - queryTask.StartTime) > maxTimeSet) { + if (currentTime - queryTask.StartTime > maxTimeSet) { // The sleep() is unpredictable. testException = new QueryExecutionTimeoutException( "The QueryMonitor thread may be sleeping longer than" @@ -148,7 +142,7 @@ public class QueryMonitor implements Runnable { } // END - DUnit Test purpose. - q.setQueryCompletedForMonitoring(true); + defaultQuery.setQueryCompletedForMonitoring(true); // Remove the query task from the queue. queryThreads.remove(new QueryThreadTask(queryThread, null, null)); } @@ -183,11 +177,11 @@ public class QueryMonitor implements Runnable { */ public void stopMonitoring() { // synchronized in the rare case where query monitor was created but not yet run - synchronized (stopped) { + synchronized (this.stopped) { if (this.monitoringThread != null) { this.monitoringThread.interrupt(); } - stopped.set(Boolean.TRUE); + this.stopped.set(Boolean.TRUE); } } @@ -195,26 +189,28 @@ public class QueryMonitor implements Runnable { * Starts monitoring the query. If query runs longer than the set MAX_QUERY_EXECUTION_TIME, * interrupts the thread executing the query. */ + @Override public void run() { // if the query monitor is stopped before run has been called, we should not run - synchronized (stopped) { - if (stopped.get()) { + synchronized (this.stopped) { + if (this.stopped.get()) { queryThreads.clear(); return; } this.monitoringThread = Thread.currentThread(); } try { - QueryThreadTask queryTask = null; - long sleepTime = 0; + QueryThreadTask queryTask; + long sleepTime; + // TODO: while-block cannot complete without throwing while (true) { // Get the first query task from the queue. This query will have the shortest // remaining time that needs to canceled first. queryTask = (QueryThreadTask) queryThreads.peek(); if (queryTask == null) { // Empty queue. - synchronized (this.queryThreads) { - this.queryThreads.wait(); + synchronized (queryThreads) { + queryThreads.wait(); } continue; } @@ -222,7 +218,7 @@ public class QueryMonitor implements Runnable { long currentTime = System.currentTimeMillis(); // Check if the sleepTime is greater than the remaining query execution time. - if ((currentTime - queryTask.StartTime) < this.maxQueryExecutionTime) { + if (currentTime - queryTask.StartTime < this.maxQueryExecutionTime) { sleepTime = this.maxQueryExecutionTime - (currentTime - queryTask.StartTime); // Its been noted that the sleep is not guaranteed to wait for the specified // time (as stated in Suns doc too), it depends on the OSs thread scheduling @@ -256,16 +252,18 @@ public class QueryMonitor implements Runnable { logger.debug("Query Execution for the thread {} got canceled.", queryTask.queryThread); } } - } catch (InterruptedException ex) { + } catch (InterruptedException ignore) { if (logger.isDebugEnabled()) { logger.debug("Query Monitoring thread got interrupted."); } } finally { - this.queryThreads.clear(); + queryThreads.clear(); } } - // Assumes LOW_MEMORY will only be set if query monitor is enabled + /** + * Assumes LOW_MEMORY will only be set if query monitor is enabled + */ public static boolean isLowMemory() { return LOW_MEMORY; } @@ -283,21 +281,22 @@ public class QueryMonitor implements Runnable { } public void cancelAllQueriesDueToMemory() { - synchronized (this.queryThreads) { + synchronized (queryThreads) { QueryThreadTask queryTask = (QueryThreadTask) queryThreads.poll(); while (queryTask != null) { cancelQueryDueToLowMemory(queryTask, LOW_MEMORY_USED_BYTES); queryTask = (QueryThreadTask) queryThreads.poll(); } queryThreads.clear(); - queryThreads.notify(); + queryThreads.notifyAll(); } } private void cancelQueryDueToLowMemory(QueryThreadTask queryTask, long memoryThreshold) { boolean[] queryCompleted = ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring(); synchronized (queryCompleted) { - if (!queryCompleted[0]) { // cancel if query is not completed + if (!queryCompleted[0]) { + // cancel if query is not completed String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY .toLocalizedString(memoryThreshold); ((DefaultQuery) queryTask.query).setCanceled(true, @@ -307,25 +306,27 @@ public class QueryMonitor implements Runnable { } } - // FOR TEST PURPOSE + /** FOR TEST PURPOSE */ public int getQueryMonitorThreadCount() { - return this.queryThreads.size(); + return queryThreads.size(); } /** * Query Monitoring task, placed in the queue. - * */ - private class QueryThreadTask { + private static class QueryThreadTask { - private final long StartTime; + // package-private to avoid synthetic accessor + final long StartTime; - private final Thread queryThread; + // package-private to avoid synthetic accessor + final Thread queryThread; - private final Query query; - - private final AtomicBoolean queryExecutionStatus; + // package-private to avoid synthetic accessor + final Query query; + // package-private to avoid synthetic accessor + final AtomicBoolean queryExecutionStatus; QueryThreadTask(Thread queryThread, Query query, AtomicBoolean queryExecutionStatus) { this.StartTime = System.currentTimeMillis(); @@ -355,12 +356,11 @@ public class QueryMonitor implements Runnable { @Override public String toString() { - return new StringBuffer().append("QueryThreadTask[StartTime:").append(this.StartTime) + return new StringBuilder().append("QueryThreadTask[StartTime:").append(this.StartTime) .append(", queryThread:").append(this.queryThread).append(", threadId:") .append(this.queryThread.getId()).append(", query:").append(this.query.getQueryString()) - .append(", queryExecutionStatus:").append(this.queryExecutionStatus).append("]") + .append(", queryExecutionStatus:").append(this.queryExecutionStatus).append(']') .toString(); } - } }