http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 857a952..57fa25a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
+import static 
org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE;
 import static org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 
 import java.sql.SQLException;
@@ -54,6 +55,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
@@ -140,6 +142,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
                 public Object getJobId() {
                     return HashJoinPlan.this;
                 }
+
+                @Override
+                public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                    return NO_OP_INSTANCE;
+                }
             }));
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 99f41b2..af3bcf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,10 @@
  */
 package org.apache.phoenix.execute;
 
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -39,7 +43,11 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.MutationMetricQueue;
+import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
+import 
org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -65,9 +73,6 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.sun.istack.NotNull;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
 
 /**
  * 
@@ -85,11 +90,17 @@ public class MutationState implements SQLCloseable {
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
     private long sizeOffset;
     private int numRows = 0;
+    private final MutationMetricQueue mutationMetricQueue;
+    private ReadMetricQueue readMetricQueue;
     
-    MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>> mutations) {
+    MutationState(long maxSize, PhoenixConnection connection,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) 
{
         this.maxSize = maxSize;
         this.connection = connection;
         this.mutations = mutations;
+        boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
+        this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
+                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
     }
 
     public MutationState(long maxSize, PhoenixConnection connection) {
@@ -108,6 +119,12 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
+    public static MutationState emptyMutationState(long maxSize, 
PhoenixConnection connection) {
+        MutationState state = new MutationState(maxSize, connection, 
Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap());
+        state.sizeOffset = 0;
+        return state;
+    }
+    
     private void throwIfTooBig() {
         if (numRows > maxSize) {
             // TODO: throw SQLException ?
@@ -120,17 +137,18 @@ public class MutationState implements SQLCloseable {
     }
     
     /**
-     * Combine a newer mutation with this one, where in the event of overlaps,
-     * the newer one will take precedence.
-     * @param newMutation the newer mutation
+     * Combine a newer mutation with this one, where in the event of overlaps, 
the newer one will take precedence.
+     * Combine any metrics collected for the newer mutation.
+     * 
+     * @param newMutationState the newer mutation state
      */
-    public void join(MutationState newMutation) {
-        if (this == newMutation) { // Doesn't make sense
+    public void join(MutationState newMutationState) {
+        if (this == newMutationState) { // Doesn't make sense
             return;
         }
-        this.sizeOffset += newMutation.sizeOffset;
+        this.sizeOffset += newMutationState.sizeOffset;
         // Merge newMutation with this one, keeping state from newMutation for 
any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : newMutation.mutations.entrySet()) {
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : newMutationState.mutations.entrySet()) {
             // Replace existing entries for the table with new entries
             TableRef tableRef = entry.getKey();
             PTable table = tableRef.getTable();
@@ -168,6 +186,12 @@ public class MutationState implements SQLCloseable {
                 }
             }
         }
+        
mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue);
+        if (readMetricQueue == null) {
+            readMetricQueue = newMutationState.readMetricQueue;
+        } else if (readMetricQueue != null && newMutationState.readMetricQueue 
!= null) {
+            
readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue);
+        }
         throwIfTooBig();
     }
     
@@ -332,18 +356,15 @@ public class MutationState implements SQLCloseable {
         return timeStamps;
     }
     
-    private static void logMutationSize(HTableInterface htable, List<Mutation> 
mutations, PhoenixConnection connection) {
+    private static long calculateMutationSize(List<Mutation> mutations) {
         long byteSize = 0;
-        int keyValueCount = 0;
-        if (PhoenixMetrics.isMetricsEnabled() || logger.isDebugEnabled()) {
+        if (GlobalClientMetrics.isMetricsEnabled()) {
             for (Mutation mutation : mutations) {
                 byteSize += mutation.heapSize();
             }
-            MUTATION_BYTES.update(byteSize);
-            if (logger.isDebugEnabled()) {
-                logger.debug(LogUtil.addCustomAnnotations("Sending " + 
mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + 
" with " + keyValueCount + " key values of total size " + byteSize + " bytes", 
connection));
-            }
         }
+        GLOBAL_MUTATION_BYTES.update(byteSize);
+        return byteSize;
     }
     
     @SuppressWarnings("deprecation")
@@ -352,126 +373,134 @@ public class MutationState implements SQLCloseable {
         byte[] tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
         long[] serverTimeStamps = validate();
         Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> 
iterator = this.mutations.entrySet().iterator();
-
         // add tracing for this operation
-        TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables");
-        Span span = trace.getSpan();
-        while (iterator.hasNext()) {
-            Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry 
= iterator.next();
-            Map<ImmutableBytesPtr,RowMutationState> valuesMap = 
entry.getValue();
-            TableRef tableRef = entry.getKey();
-            PTable table = tableRef.getTable();
-            table.getIndexMaintainers(tempPtr, connection);
-            boolean hasIndexMaintainers = tempPtr.getLength() > 0;
-            boolean isDataTable = true;
-            long serverTimestamp = serverTimeStamps[i++];
-            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false);
-            while (mutationsIterator.hasNext()) {
-                Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
-                byte[] htableName = pair.getFirst();
-                List<Mutation> mutations = pair.getSecond();
-                
-                //create a span per target table
-                //TODO maybe we can be smarter about the table name to string 
here?
-                Span child = Tracing.child(span,"Writing mutation batch for 
table: "+Bytes.toString(htableName));
+        try (TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables")) {
+            Span span = trace.getSpan();
+            while (iterator.hasNext()) {
+                Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry = iterator.next();
+                // at this point we are going through mutations for each table
 
-                int retryCount = 0;
-                boolean shouldRetry = false;
-                do {
-                    ServerCache cache = null;
-                    if (hasIndexMaintainers && isDataTable) {
-                        byte[] attribValue = null;
-                        byte[] uuidValue;
-                        if 
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, 
tempPtr.getLength())) {
-                            IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
-                            cache = client.addIndexMetadataCache(mutations, 
tempPtr);
-                            child.addTimelineAnnotation("Updated index 
metadata cache");
-                            uuidValue = cache.getId();
-                            // If we haven't retried yet, retry for this case 
only, as it's possible that
-                            // a split will occur after we send the index 
metadata cache to all known
-                            // region servers.
-                            shouldRetry = true;
-                        } else {
-                            attribValue = 
ByteUtil.copyKeyBytesIfNecessary(tempPtr);
-                            uuidValue = ServerCacheClient.generateId();
-                        }
-                        // Either set the UUID to be able to access the index 
metadata from the cache
-                        // or set the index metadata directly on the Mutation
-                        for (Mutation mutation : mutations) {
-                            if (tenantId != null) {
-                                
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-                            }
-                            
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                            if (attribValue != null) {
-                                
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                            }
-                        }
-                    }
-                    
-                    SQLException sqlE = null;
-                    HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
-                    try {
-                        logMutationSize(hTable, mutations, connection);
-                        MUTATION_BATCH_SIZE.update(mutations.size());
-                        long startTime = System.currentTimeMillis();
-                        child.addTimelineAnnotation("Attempt " + retryCount);
-                        hTable.batch(mutations);
-                        child.stop();
-                        long duration = System.currentTimeMillis() - startTime;
-                        MUTATION_COMMIT_TIME.update(duration);
-                        shouldRetry = false;
-                        if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + 
mutations.size() + " mutations into " + table.getName().getString() + ": " + 
duration + " ms", connection));
-                    } catch (Exception e) {
-                        SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
-                        if (inferredE != null) {
-                            if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
-                                // Swallow this exception once, as it's 
possible that we split after sending the index metadata
-                                // and one of the region servers doesn't have 
it. This will cause it to have it the next go around.
-                                // If it fails again, we don't retry.
-                                String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
-                                logger.warn(LogUtil.addCustomAnnotations(msg, 
connection));
-                                
connection.getQueryServices().clearTableRegionCache(htableName);
+                Map<ImmutableBytesPtr,RowMutationState> valuesMap = 
entry.getValue();
+                // above is mutations for a table where the first part is the 
row key and the second part is column values.
 
-                                // add a new child span as this one failed
-                                child.addTimelineAnnotation(msg);
-                                child.stop();
-                                child = Tracing.child(span,"Failed batch, 
attempting retry");
+                TableRef tableRef = entry.getKey();
+                PTable table = tableRef.getTable();
+                table.getIndexMaintainers(tempPtr, connection);
+                boolean hasIndexMaintainers = tempPtr.getLength() > 0;
+                boolean isDataTable = true;
+                long serverTimestamp = serverTimeStamps[i++];
+                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false);
+                // above returns an iterator of pair where the first  
+                while (mutationsIterator.hasNext()) {
+                    Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
+                    byte[] htableName = pair.getFirst();
+                    List<Mutation> mutations = pair.getSecond();
 
-                                continue;
+                    //create a span per target table
+                    //TODO maybe we can be smarter about the table name to 
string here?
+                    Span child = Tracing.child(span,"Writing mutation batch 
for table: "+Bytes.toString(htableName));
+
+                    int retryCount = 0;
+                    boolean shouldRetry = false;
+                    do {
+                        ServerCache cache = null;
+                        if (hasIndexMaintainers && isDataTable) {
+                            byte[] attribValue = null;
+                            byte[] uuidValue;
+                            if 
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, 
tempPtr.getLength())) {
+                                IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
+                                cache = 
client.addIndexMetadataCache(mutations, tempPtr);
+                                child.addTimelineAnnotation("Updated index 
metadata cache");
+                                uuidValue = cache.getId();
+                                // If we haven't retried yet, retry for this 
case only, as it's possible that
+                                // a split will occur after we send the index 
metadata cache to all known
+                                // region servers.
+                                shouldRetry = true;
+                            } else {
+                                attribValue = 
ByteUtil.copyKeyBytesIfNecessary(tempPtr);
+                                uuidValue = ServerCacheClient.generateId();
+                            }
+                            // Either set the UUID to be able to access the 
index metadata from the cache
+                            // or set the index metadata directly on the 
Mutation
+                            for (Mutation mutation : mutations) {
+                                if (tenantId != null) {
+                                    
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+                                }
+                                
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                if (attribValue != null) {
+                                    
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                }
                             }
-                            e = inferredE;
                         }
-                        sqlE = new CommitException(e, 
getUncommittedSattementIndexes());
-                    } finally {
+
+                        SQLException sqlE = null;
+                        HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
                         try {
-                            hTable.close();
-                        } catch (IOException e) {
-                            if (sqlE != null) {
-                                
sqlE.setNextException(ServerUtil.parseServerException(e));
-                            } else {
-                                sqlE = ServerUtil.parseServerException(e);
+                            long numMutations = mutations.size();
+                            GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
+                            
+                            long startTime = System.currentTimeMillis();
+                            child.addTimelineAnnotation("Attempt " + 
retryCount);
+                            hTable.batch(mutations);
+                            child.stop();
+                            shouldRetry = false;
+                            long mutationCommitTime = 
System.currentTimeMillis() - startTime;
+                            
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
+                            
+                            long mutationSizeBytes = 
calculateMutationSize(mutations);
+                            MutationMetric mutationsMetric = new 
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
+                            
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), 
mutationsMetric);
+                        } catch (Exception e) {
+                            SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
+                            if (inferredE != null) {
+                                if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+                                    // Swallow this exception once, as it's 
possible that we split after sending the index metadata
+                                    // and one of the region servers doesn't 
have it. This will cause it to have it the next go around.
+                                    // If it fails again, we don't retry.
+                                    String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
+                                    
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+                                    
connection.getQueryServices().clearTableRegionCache(htableName);
+
+                                    // add a new child span as this one failed
+                                    child.addTimelineAnnotation(msg);
+                                    child.stop();
+                                    child = Tracing.child(span,"Failed batch, 
attempting retry");
+
+                                    continue;
+                                }
+                                e = inferredE;
                             }
+                            sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
                         } finally {
                             try {
-                                if (cache != null) {
-                                    cache.close();
+                                hTable.close();
+                            } catch (IOException e) {
+                                if (sqlE != null) {
+                                    
sqlE.setNextException(ServerUtil.parseServerException(e));
+                                } else {
+                                    sqlE = ServerUtil.parseServerException(e);
                                 }
                             } finally {
-                                if (sqlE != null) {
-                                    throw sqlE;
+                                try {
+                                    if (cache != null) {
+                                        cache.close();
+                                    }
+                                } finally {
+                                    if (sqlE != null) {
+                                        throw sqlE;
+                                    }
                                 }
                             }
                         }
-                    }
-                } while (shouldRetry && retryCount++ < 1);
-                isDataTable = false;
-            }
-            if (tableRef.getTable().getType() != PTableType.INDEX) {
-                numRows -= entry.getValue().size();
+                    } while (shouldRetry && retryCount++ < 1);
+                    isDataTable = false;
+                }
+                if (tableRef.getTable().getType() != PTableType.INDEX) {
+                    numRows -= entry.getValue().size();
+                }
+                iterator.remove(); // Remove batches as we process them
             }
-            iterator.remove(); // Remove batches as we process them
         }
-        trace.close();
         assert(numRows==0);
         assert(this.mutations.isEmpty());
     }
@@ -481,7 +510,7 @@ public class MutationState implements SQLCloseable {
         numRows = 0;
     }
     
-    private int[] getUncommittedSattementIndexes() {
+    private int[] getUncommittedStatementIndexes() {
        int[] result = new int[0];
        for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : 
mutations.values()) {
                for (RowMutationState rowMutationState : rowMutations.values()) 
{
@@ -533,12 +562,23 @@ public class MutationState implements SQLCloseable {
         int[] getStatementIndexes() {
             return statementIndexes;
         }
-        
+
         void join(RowMutationState newRow) {
             getColumnValues().putAll(newRow.getColumnValues());
             statementIndexes = joinSortedIntArrays(statementIndexes, 
newRow.getStatementIndexes());
         }
-        
+    }
+    
+    public ReadMetricQueue getReadMetricQueue() {
+        return readMetricQueue;
+    }
 
+    public void setReadMetricQueue(ReadMetricQueue readMetricQueue) {
+        this.readMetricQueue = readMetricQueue;
     }
+
+    public MutationMetricQueue getMutationMetricQueue() {
+        return mutationMetricQueue;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 031b58b..2bed3a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -49,7 +49,7 @@ public class UnionPlan implements QueryPlan {
     private final FilterableStatement statement;
     private final ParameterMetaData paramMetaData;
     private final OrderBy orderBy;
-    private final StatementContext context;
+    private final StatementContext parentContext;
     private final Integer limit;
     private final GroupBy groupBy;
     private final RowProjector projector;
@@ -59,7 +59,7 @@ public class UnionPlan implements QueryPlan {
 
     public UnionPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector,
             Integer limit, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> 
plans, ParameterMetaData paramMetaData) throws SQLException {
-        this.context = context;
+        this.parentContext = context;
         this.statement = statement;
         this.tableRef = table;
         this.projector = projector;
@@ -128,7 +128,7 @@ public class UnionPlan implements QueryPlan {
     }
 
     public final ResultIterator iterator(final List<? extends SQLCloseable> 
dependencies) throws SQLException {
-        this.iterators = new UnionResultIterators(plans);
+        this.iterators = new UnionResultIterators(plans, parentContext);
         ResultIterator scanner;      
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
 
@@ -175,7 +175,7 @@ public class UnionPlan implements QueryPlan {
 
     @Override
     public StatementContext getContext() {
-        return context;
+        return parentContext;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 6a3847b..43731cb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -18,8 +18,8 @@
 package org.apache.phoenix.iterate;
 
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import java.sql.SQLException;
@@ -540,12 +540,13 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     } catch (ExecutionException e) {
                         try { // Rethrow as SQLException
                             throw ServerUtil.parseServerException(e);
-                        } catch (StaleRegionBoundaryCacheException e2) { 
+                        } catch (StaleRegionBoundaryCacheException e2) {
                             // Catch only to try to recover from region 
boundary cache being out of date
                             
List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = 
Lists.newArrayListWithExpectedSize(2);
                             if (!clearedCache) { // Clear cache once so that 
we rejigger job based on new boundaries
                                 
services.clearTableRegionCache(physicalTableName);
                                 clearedCache = true;
+                                
context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
                             }
                             // Resubmit just this portion of work again
                             Scan oldScan = scanPair.getFirst();
@@ -582,7 +583,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             success = true;
             return iterators;
         } catch (TimeoutException e) {
-            QUERY_TIMEOUT.increment();
+            context.getOverallQueryMetrics().queryTimedOut();
+            GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
             // thrown when a thread times out waiting for the future.get() 
call to return
             toThrow = new 
SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
                     .setMessage(". Query couldn't be completed in the alloted 
time: " + queryTimeOut + " ms")
@@ -616,7 +618,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                 }
             } finally {
                 if (toThrow != null) {
-                    FAILED_QUERY.increment();
+                    GLOBAL_FAILED_QUERY_COUNTER.increment();
+                    context.getOverallQueryMetrics().queryFailed();
                     throw toThrow;
                 }
             }
@@ -639,7 +642,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                         if (futurePair != null) {
                             Future<PeekingResultIterator> future = 
futurePair.getSecond();
                             if (future != null) {
-                                cancelledWork |= future.cancel(false);
+                                future.cancel(false);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index e1ee8db..f272e55 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.iterate;
 
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 
 import java.sql.SQLException;
 import java.util.List;
@@ -66,18 +67,17 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
         }
 
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException {
-            scanner.close(); //close the iterator since we don't need it 
anymore.
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName) throws SQLException {
             if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator
 over " + tableRef.getTable().getName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
             return new ChunkedResultIterator(delegateFactory, context, 
tableRef, scan,
                     
context.getConnection().getQueryServices().getProps().getLong(
                                         QueryServices.SCAN_RESULT_CHUNK_SIZE,
-                                        
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
+                                        
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
         }
     }
 
-    public ChunkedResultIterator(ParallelIteratorFactory 
delegateIteratorFactory,
-            StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize) throws SQLException {
+    private ChunkedResultIterator(ParallelIteratorFactory 
delegateIteratorFactory,
+            StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize, ResultIterator scanner) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
@@ -87,9 +87,9 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
         // to get parallel scans kicked off in separate threads. If we delay 
this,
         // we'll get serialized behavior (see PHOENIX-
         if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator 
over " + tableRef.getTable().getName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
-        ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(
-                new TableResultIterator(context, tableRef, scan), chunkSize);
-        resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan);
+        ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(scanner, chunkSize);
+        String tableName = tableRef.getTable().getPhysicalName().getString();
+        resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName);
     }
 
     @Override
@@ -118,9 +118,10 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             scan = ScanUtil.newScan(scan);
             scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
             if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator 
over " + tableRef.getTable().getName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
+            String tableName = 
tableRef.getTable().getPhysicalName().getString();
             ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(
-                    new TableResultIterator(context, tableRef, scan), 
chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan);
+                    new TableResultIterator(context, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize);
+            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName);
         }
         return resultIterator;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index df8f658..f25e373 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -25,10 +25,10 @@ import org.apache.phoenix.compile.StatementContext;
 public interface ParallelIteratorFactory {
     public static ParallelIteratorFactory NOOP_FACTORY = new 
ParallelIteratorFactory() {
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan)
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String physicalTableName)
                 throws SQLException {
             return LookAheadResultIterator.wrap(scanner);
         }
     };
-    PeekingResultIterator newIterator(StatementContext context, ResultIterator 
scanner, Scan scan) throws SQLException;
+    PeekingResultIterator newIterator(StatementContext context, ResultIterator 
scanner, Scan scan, String physicalTableName) throws SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index be10c20..2dfbfe3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS;
 
 import java.sql.SQLException;
 import java.util.Collections;
@@ -30,6 +30,10 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -79,19 +83,25 @@ public class ParallelIterators extends BaseResultIterators {
         // Shuffle so that we start execution across many machines
         // before we fill up the thread pool
         Collections.shuffle(scanLocations);
-        PARALLEL_SCANS.update(scanLocations.size());
+        ReadMetricQueue readMetrics = context.getReadMetricsQueue();
+        final String physicalTableName = 
tableRef.getTable().getPhysicalName().getString();
+        int numScans = scanLocations.size();
+        context.getOverallQueryMetrics().updateNumParallelScans(numScans);
+        GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
         for (ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
+            final CombinableMetric scanMetrics = 
readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
+            final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(readMetrics, physicalTableName);
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
-
+                
                 @Override
                 public PeekingResultIterator call() throws Exception {
                     long startTime = System.currentTimeMillis();
-                    ResultIterator scanner = new TableResultIterator(context, 
tableRef, scan);
+                    ResultIterator scanner = new TableResultIterator(context, 
tableRef, scan, scanMetrics);
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Id: " + 
scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + 
scan, ScanUtil.getCustomAnnotations(scan)));
                     }
-                    PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, scanner, scan);
+                    PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, scanner, scan, physicalTableName);
                     
                     // Fill the scanner's cache. This helps reduce latency 
since we are parallelizing the I/O needed.
                     iterator.peek();
@@ -109,6 +119,11 @@ public class ParallelIterators extends BaseResultIterators 
{
                 public Object getJobId() {
                     return ParallelIterators.this;
                 }
+
+                @Override
+                public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                    return taskMetrics;
+                }
             }, "Parallel scanner for table: " + 
tableRef.getTable().getName().getString()));
             // Add our future in the right place so that we can concatenate the
             // results of the inner futures versus merge sorting across all of 
them.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
index 4a9ad3e..92ac570 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.iterate;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -268,7 +268,7 @@ public class RoundRobinResultIterator implements 
ResultIterator {
                 }
             } finally {
                 if (toThrow != null) {
-                    FAILED_QUERY.increment();
+                    GLOBAL_FAILED_QUERY_COUNTER.increment();
                     throw toThrow;
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index fd65d0c..b722794 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -28,15 +28,20 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
 
 public class ScanningResultIterator implements ResultIterator {
     private final ResultScanner scanner;
-    public ScanningResultIterator(ResultScanner scanner) {
+    private final CombinableMetric scanMetrics;
+    
+    public ScanningResultIterator(ResultScanner scanner, CombinableMetric 
scanMetrics) {
         this.scanner = scanner;
+        this.scanMetrics = scanMetrics;
     }
     
     @Override
@@ -66,17 +71,18 @@ public class ScanningResultIterator implements 
ResultIterator {
                return "ScanningResultIterator [scanner=" + scanner + "]";
        }
        
-       private static void calculateScanSize(Result result) {
-           if (PhoenixMetrics.isMetricsEnabled()) {
-               if (result != null) {
-                   Cell[] cells = result.rawCells();
-                   long scanResultSize = 0;
-                   for (Cell cell : cells) {
-                       KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-                       scanResultSize += kv.heapSize();
-                   }
-                   SCAN_BYTES.update(scanResultSize);
-               }
-           }
-       }
+    private void calculateScanSize(Result result) {
+        if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != 
NoOpRequestMetric.INSTANCE) {
+            if (result != null) {
+                Cell[] cells = result.rawCells();
+                long scanResultSize = 0;
+                for (Cell cell : cells) {
+                    KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+                    scanResultSize += kv.heapSize();
+                }
+                scanMetrics.change(scanResultSize);
+                GLOBAL_SCAN_BYTES.update(scanResultSize);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 6b3b5e3..516d73e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -29,11 +31,9 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.iterate.TableResultIterator.ScannerCreation;
 import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.ScanUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -48,7 +48,6 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class SerialIterators extends BaseResultIterators {
-       private static final Logger logger = 
LoggerFactory.getLogger(SerialIterators.class);
        private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
     
@@ -74,18 +73,15 @@ public class SerialIterators extends BaseResultIterators {
             Scan lastScan = scans.get(scans.size()-1);
             final Scan overallScan = ScanUtil.newScan(firstScan);
             overallScan.setStopRow(lastScan.getStopRow());
+            final String tableName = 
tableRef.getTable().getPhysicalName().getString();
+            final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
-
                 @Override
                 public PeekingResultIterator call() throws Exception {
                        List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(scans.size());
                        for (final Scan scan : scans) {
-                           long startTime = System.currentTimeMillis();
-                           ResultIterator scanner = new 
TableResultIterator(context, tableRef, scan, ScannerCreation.DELAYED);
-                           if (logger.isDebugEnabled()) {
-                               logger.debug(LogUtil.addCustomAnnotations("Id: 
" + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: 
" + scan, ScanUtil.getCustomAnnotations(scan)));
-                           }
-                           
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
+                           ResultIterator scanner = new 
TableResultIterator(context, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
ScannerCreation.DELAYED);
+                           
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, 
tableName));
                        }
                        PeekingResultIterator concatIterator = 
ConcatResultIterator.newIterator(concatIterators);
                     allIterators.add(concatIterator);
@@ -101,6 +97,11 @@ public class SerialIterators extends BaseResultIterators {
                 public Object getJobId() {
                     return SerialIterators.this;
                 }
+
+                @Override
+                public TaskExecutionMetricsHolder getTaskExecutionMetric() {
+                    return taskMetrics;
+                }
             }, "Serial scanner for table: " + 
tableRef.getTable().getName().getString()));
             // Add our singleton Future which will execute serially
             nestedFutures.add(Collections.singletonList(new 
Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 63d3761..0a3c32b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -17,8 +17,10 @@
  */
 package org.apache.phoenix.iterate;
 
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SPOOL_FILE_SIZE;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_CHUNK_BYTES;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_WAIT_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_SIZE;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
@@ -37,6 +39,9 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.monitoring.MemoryMetricsHolder;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.SpoolingMetricsHolder;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -55,8 +60,10 @@ import org.apache.phoenix.util.TupleUtil;
  * @since 0.1
  */
 public class SpoolingResultIterator implements PeekingResultIterator {
-    private final PeekingResultIterator spoolFrom;
     
+    private final PeekingResultIterator spoolFrom;
+    private final SpoolingMetricsHolder spoolMetrics;
+    private final MemoryMetricsHolder memoryMetrics;
     public static class SpoolingResultIteratorFactory implements 
ParallelIteratorFactory {
         private final QueryServices services;
 
@@ -64,14 +71,16 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException {
-            return new SpoolingResultIterator(scanner, services);
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String physicalTableName) throws 
SQLException {
+            ReadMetricQueue readRequestMetric = context.getReadMetricsQueue();
+            SpoolingMetricsHolder spoolMetrics = new 
SpoolingMetricsHolder(readRequestMetric, physicalTableName);
+            MemoryMetricsHolder memoryMetrics = new 
MemoryMetricsHolder(readRequestMetric, physicalTableName);
+            return new SpoolingResultIterator(spoolMetrics, memoryMetrics, 
scanner, services);
         }
-
     }
 
-    public SpoolingResultIterator(ResultIterator scanner, QueryServices 
services) throws SQLException {
-        this (scanner, services.getMemoryManager(),
+    private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics, 
MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices 
services) throws SQLException {
+        this (spoolMetrics, memoryMetrics, scanner, 
services.getMemoryManager(),
                 
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
                 
services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
                 services.getProps().get(QueryServices.SPOOL_DIRECTORY, 
QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
@@ -86,9 +95,15 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
     *  the memory manager) is exceeded.
     * @throws SQLException
     */
-    SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int 
thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws 
SQLException {
+    SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder 
mMetrics, ResultIterator scanner, MemoryManager mm, final int thresholdBytes, 
final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
+        this.spoolMetrics = sMetrics;
+        this.memoryMetrics = mMetrics;
         boolean success = false;
+        long startTime = System.currentTimeMillis();
         final MemoryChunk chunk = mm.allocate(0, thresholdBytes);
+        long waitTime = System.currentTimeMillis() - startTime;
+        GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
+        memoryMetrics.getMemoryWaitTimeMetric().change(waitTime);
         DeferredFileOutputStream spoolTo = null;
         try {
             // Can't be bigger than int, since it's the max of the above 
allocation
@@ -96,8 +111,11 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
             spoolTo = new DeferredFileOutputStream(size, 
"ResultSpooler",".bin", new File(spoolDirectory)) {
                 @Override
                 protected void thresholdReached() throws IOException {
-                    super.thresholdReached();
-                    chunk.close();
+                    try {
+                        super.thresholdReached();
+                    } finally {
+                        chunk.close();
+                    }
                 }
             };
             DataOutputStream out = new DataOutputStream(spoolTo);
@@ -115,9 +133,14 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
                 byte[] data = spoolTo.getData();
                 chunk.resize(data.length);
                 spoolFrom = new InMemoryResultIterator(data, chunk);
+                GLOBAL_MEMORY_CHUNK_BYTES.update(data.length);
+                memoryMetrics.getMemoryChunkSizeMetric().change(data.length);
             } else {
-                NUM_SPOOL_FILE.increment();
-                SPOOL_FILE_SIZE.update(spoolTo.getFile().length());
+                long sizeOfSpoolFile = spoolTo.getFile().length();
+                GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
+                GLOBAL_SPOOL_FILE_COUNTER.increment();
+                spoolMetrics.getNumSpoolFileMetric().increment();
+                spoolMetrics.getSpoolFileSizeMetric().change(sizeOfSpoolFile);
                 spoolFrom = new OnDiskResultIterator(spoolTo.getFile());
                 if (spoolTo.getFile() != null) {
                     spoolTo.getFile().deleteOnExit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index ea13dfd..6f040d1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.Closeables;
@@ -44,9 +45,10 @@ public class TableResultIterator extends ExplainTable 
implements ResultIterator
     private final Scan scan;
     private final HTableInterface htable;
     private volatile ResultIterator delegate;
-
-    public TableResultIterator(StatementContext context, TableRef tableRef) 
throws SQLException {
-        this(context, tableRef, context.getScan());
+    private final CombinableMetric scanMetrics;
+    
+    public TableResultIterator(StatementContext context, TableRef tableRef, 
CombinableMetric scanMetrics) throws SQLException {
+        this(context, tableRef, context.getScan(), scanMetrics);
     }
 
     /*
@@ -62,7 +64,7 @@ public class TableResultIterator extends ExplainTable 
implements ResultIterator
                 delegate = this.delegate;
                 if (delegate == null) {
                     try {
-                        this.delegate = delegate = isClosing ? 
ResultIterator.EMPTY_ITERATOR : new 
ScanningResultIterator(htable.getScanner(scan));
+                        this.delegate = delegate = isClosing ? 
ResultIterator.EMPTY_ITERATOR : new 
ScanningResultIterator(htable.getScanner(scan), scanMetrics);
                     } catch (IOException e) {
                         Closeables.closeQuietly(htable);
                         throw ServerUtil.parseServerException(e);
@@ -73,13 +75,14 @@ public class TableResultIterator extends ExplainTable 
implements ResultIterator
         return delegate;
     }
     
-    public TableResultIterator(StatementContext context, TableRef tableRef, 
Scan scan) throws SQLException {
-        this(context, tableRef, scan, ScannerCreation.IMMEDIATE);
+    public TableResultIterator(StatementContext context, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics) throws SQLException {
+        this(context, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE);
     }
 
-    public TableResultIterator(StatementContext context, TableRef tableRef, 
Scan scan, ScannerCreation creationMode) throws SQLException {
+    public TableResultIterator(StatementContext context, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws 
SQLException {
         super(context, tableRef);
         this.scan = scan;
+        this.scanMetrics = scanMetrics;
         htable = 
context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes());
         if (creationMode == ScannerCreation.IMMEDIATE) {
                getDelegate(false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
index b7c8b21..2296982 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java
@@ -22,6 +22,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.util.ServerUtil;
 
@@ -39,14 +42,22 @@ public class UnionResultIterators implements 
ResultIterators {
     private final List<List<Scan>> scans;
     private final List<PeekingResultIterator> iterators;
     private final List<QueryPlan> plans;
-
-    public UnionResultIterators(List<QueryPlan> plans) throws SQLException {
+    private final List<ReadMetricQueue> readMetricsList;
+    private final List<OverAllQueryMetrics> overAllQueryMetricsList;
+    private boolean closed;
+    private final StatementContext parentStmtCtx;
+    public UnionResultIterators(List<QueryPlan> plans, StatementContext 
parentStmtCtx) throws SQLException {
+        this.parentStmtCtx = parentStmtCtx;
         this.plans = plans;
         int nPlans = plans.size();
         iterators = Lists.newArrayListWithExpectedSize(nPlans);
         splits = Lists.newArrayListWithExpectedSize(nPlans * 30); 
         scans = Lists.newArrayListWithExpectedSize(nPlans * 10); 
+        readMetricsList = Lists.newArrayListWithCapacity(nPlans);
+        overAllQueryMetricsList = Lists.newArrayListWithCapacity(nPlans);
         for (QueryPlan plan : this.plans) {
+            readMetricsList.add(plan.getContext().getReadMetricsQueue());
+            
overAllQueryMetricsList.add(plan.getContext().getOverallQueryMetrics());
             iterators.add(LookAheadResultIterator.wrap(plan.iterator()));
             splits.addAll(plan.getSplits()); 
             scans.addAll(plan.getScans());
@@ -59,32 +70,47 @@ public class UnionResultIterators implements 
ResultIterators {
     }
 
     @Override
-    public void close() throws SQLException {   
-        SQLException toThrow = null;
-        try {
-            if (iterators != null) {
-                for (int index=0; index < iterators.size(); index++) {
-                    PeekingResultIterator iterator = iterators.get(index);
-                    try {
-                        iterator.close();
-                    } catch (Exception e) {
-                        if (toThrow == null) {
-                            toThrow = ServerUtil.parseServerException(e);
-                        } else {
-                            
toThrow.setNextException(ServerUtil.parseServerException(e));
+    public void close() throws SQLException {
+        if (!closed) {
+            closed = true;
+            SQLException toThrow = null;
+            try {
+                if (iterators != null) {
+                    for (int index=0; index < iterators.size(); index++) {
+                        PeekingResultIterator iterator = iterators.get(index);
+                        try {
+                            iterator.close();
+                        } catch (Exception e) {
+                            if (toThrow == null) {
+                                toThrow = ServerUtil.parseServerException(e);
+                            } else {
+                                
toThrow.setNextException(ServerUtil.parseServerException(e));
+                            }
                         }
                     }
                 }
-            }
-        } catch (Exception e) {
-            toThrow = ServerUtil.parseServerException(e);
-        } finally {
-            if (toThrow != null) {
-                throw toThrow;
+            } catch (Exception e) {
+                toThrow = ServerUtil.parseServerException(e);
+            } finally {
+                setMetricsInParentContext();
+                if (toThrow != null) {
+                    throw toThrow;
+                }
             }
         }
     }
-
+    
+    private void setMetricsInParentContext() {
+        ReadMetricQueue parentCtxReadMetrics = 
parentStmtCtx.getReadMetricsQueue();
+        for (ReadMetricQueue readMetrics : readMetricsList) {
+            parentCtxReadMetrics.combineReadMetrics(readMetrics);
+        }
+        OverAllQueryMetrics parentCtxQueryMetrics = 
parentStmtCtx.getOverallQueryMetrics();
+        for (OverAllQueryMetrics metric : overAllQueryMetricsList) {
+            parentCtxQueryMetrics.combine(metric);
+        }
+    }
+    
     @Override
     public List<List<Scan>> getScans() {
         return scans;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index dad60c1..5805999 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -123,7 +123,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     private final Properties info;
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
-    private MutationState mutationState;
+    private final MutationState mutationState;
     private final int mutateBatchSize;
     private final Long scn;
     private boolean isAutoCommit = false;
@@ -137,9 +137,9 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     private boolean isClosed = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
-    private Map<String, String> customTracingAnnotations = emptyMap(); 
     private Consistency consistency = Consistency.STRONG;
-
+    private Map<String, String> customTracingAnnotations = emptyMap();
+    private final boolean isRequestLevelMetricsEnabled;
     static {
         Tracing.addTraceMetricsSource();
     }
@@ -237,6 +237,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
                          ! Objects.equal(tenantId, function.getTenantId()));
             }
         };
+        this.isRequestLevelMetricsEnabled = 
JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, 
this.services.getProps());
         this.mutationState = newMutationState(maxSize);
         this.metaData = metaData.pruneTables(pruner);
         this.metaData = metaData.pruneFunctions(pruner);
@@ -438,6 +439,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
             return;
         }
         try {
+            clearMetrics();
             try {
                 if (traceScope != null) {
                     traceScope.close();
@@ -866,4 +868,23 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     public void setTraceScope(TraceScope traceScope) {
         this.traceScope = traceScope;
     }
+    
+    public Map<String, Map<String, Long>> getMutationMetrics() {
+        return mutationState.getMutationMetricQueue().aggregate();
+    }
+    
+    public Map<String, Map<String, Long>> getReadMetrics() {
+        return mutationState.getReadMetricQueue() != null ? 
mutationState.getReadMetricQueue().aggregate() : Collections.<String, 
Map<String, Long>>emptyMap();
+    }
+    
+    public boolean isRequestLevelMetricsEnabled() {
+        return isRequestLevelMetricsEnabled;
+    }
+    
+    public void clearMetrics() {
+        mutationState.getMutationMetricQueue().clearMetrics();
+        if (mutationState.getReadMetricQueue() != null) {
+            mutationState.getReadMetricQueue().clearMetrics();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index d1b3b27..2dd8af4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ExpressionProjector;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -311,7 +312,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = 
VersionUtil.encodeVersion("0", "94", "14");
     
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
-        this.emptyResultSet = new 
PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, 
new PhoenixStatement(connection));
+        this.emptyResultSet = new 
PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, 
new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;
     }
 
@@ -509,11 +510,10 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
                 public PhoenixStatement newStatement(PhoenixConnection 
connection) {
                     return new PhoenixStatement(connection) {
                         @Override
-                        protected PhoenixResultSet newResultSet(ResultIterator 
iterator, RowProjector projector)
-                                throws SQLException {
-                            return new PhoenixResultSet(
-                                    new 
TenantColumnFilteringIterator(iterator, projector),
-                                    projector, this);
+                        protected PhoenixResultSet newResultSet(ResultIterator 
iterator, RowProjector projector,
+                                StatementContext context) throws SQLException {
+                            return new PhoenixResultSet(new 
TenantColumnFilteringIterator(iterator, projector),
+                                    projector, context);
                         }
                     };
                 }
@@ -523,7 +523,12 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
         }
         return stmt.executeQuery(buf.toString());
     }
-
+    
+//    private ColumnResolver getColumnResolverForCatalogTable() throws 
SQLException {
+//        TableRef tableRef = new TableRef(getTable(connection, 
SYSTEM_CATALOG_NAME));
+//        return FromCompiler.getResolver(tableRef);
+//    }
+    
     /**
      * Filters the tenant id column out of a column metadata result set (thus, 
where each row is a column definition).
      * The tenant id is by definition the first column of the primary key, but 
the primary key does not necessarily
@@ -1007,7 +1012,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData, org.apache.pho
     }
     @Override
     public ResultSet getTableTypes() throws SQLException {
-        return new PhoenixResultSet(new 
MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new 
PhoenixStatement(connection));
+        return new PhoenixResultSet(new 
MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new 
StatementContext(new PhoenixStatement(connection), false));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 8ee56ea..da06370 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -39,16 +39,21 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.text.Format;
 import java.util.Calendar;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.monitoring.OverAllQueryMetrics;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PBoolean;
@@ -109,18 +114,25 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable, org.apache.pho
     private final ResultIterator scanner;
     private final RowProjector rowProjector;
     private final PhoenixStatement statement;
+    private final StatementContext context;
+    private final ReadMetricQueue readMetricsQueue;
+    private final OverAllQueryMetrics overAllQueryMetrics;
     private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 
     private Tuple currentRow = BEFORE_FIRST;
     private boolean isClosed = false;
     private boolean wasNull = false;
-
-    public PhoenixResultSet(ResultIterator resultIterator, RowProjector 
rowProjector, PhoenixStatement statement) throws SQLException {
+    private boolean firstRecordRead = false;
+    
+    public PhoenixResultSet(ResultIterator resultIterator, RowProjector 
rowProjector, StatementContext ctx) throws SQLException {
         this.rowProjector = rowProjector;
         this.scanner = resultIterator;
-        this.statement = statement;
+        this.context = ctx;
+        this.statement = context.getStatement();
+        this.readMetricsQueue = context.getReadMetricsQueue();
+        this.overAllQueryMetrics = context.getOverallQueryMetrics();
     }
-
+    
     @Override
     public boolean absolute(int row) throws SQLException {
         throw new SQLFeatureNotSupportedException();
@@ -147,14 +159,14 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable, org.apache.pho
 
     @Override
     public void close() throws SQLException {
-        if (isClosed) {
-            return;
-        }
+        if (isClosed) { return; }
         try {
             scanner.close();
         } finally {
             isClosed = true;
             statement.getResultSets().remove(this);
+            overAllQueryMetrics.endQuery();
+            overAllQueryMetrics.stopResultSetWatch();
         }
     }
 
@@ -754,6 +766,10 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable, org.apache.pho
     public boolean next() throws SQLException {
         checkOpen();
         try {
+            if (!firstRecordRead) {
+                firstRecordRead = true;
+                overAllQueryMetrics.startResultSetWatch();
+            }
             currentRow = scanner.next();
             rowProjector.reset();
         } catch (RuntimeException e) {
@@ -764,6 +780,10 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable, org.apache.pho
             }
             throw e;
         }
+        if (currentRow == null) {
+            overAllQueryMetrics.endQuery();
+            overAllQueryMetrics.stopResultSetWatch();
+        }
         return currentRow != null;
     }
 
@@ -1261,4 +1281,18 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable, org.apache.pho
     public ResultIterator getUnderlyingIterator() {
         return scanner;
     }
+    
+    public Map<String, Map<String, Long>> getReadMetrics() {
+        return readMetricsQueue.aggregate();
+    }
+
+    public Map<String, Long> getOverAllRequestReadMetrics() {
+        return overAllQueryMetrics.publish();
+    }
+    
+    public void resetMetrics() {
+        readMetricsQueue.clearMetrics();
+        overAllQueryMetrics.reset();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 7c94d62..c6c5b0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -17,9 +17,9 @@
  */
 package org.apache.phoenix.jdbc;
 
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
 
 import java.io.IOException;
 import java.io.Reader;
@@ -216,8 +216,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
         return resultSets;
     }
     
-    protected PhoenixResultSet newResultSet(ResultIterator iterator, 
RowProjector projector) throws SQLException {
-        return new PhoenixResultSet(iterator, projector, this);
+    protected PhoenixResultSet newResultSet(ResultIterator iterator, 
RowProjector projector, StatementContext context) throws SQLException {
+        return new PhoenixResultSet(iterator, projector, context);
     }
     
     protected boolean execute(final CompilableStatement stmt) throws 
SQLException {
@@ -235,7 +235,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
     }
     
     protected PhoenixResultSet executeQuery(final CompilableStatement stmt) 
throws SQLException {
-        QUERY_COUNT.increment();
+        GLOBAL_SELECT_SQL_COUNTER.increment();
         try {
             return CallRunner.run(
                 new CallRunner.CallableThrowable<PhoenixResultSet, 
SQLException>() {
@@ -253,7 +253,9 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                             String explainPlan = 
QueryUtil.getExplainPlan(resultIterator);
                             logger.debug(LogUtil.addCustomAnnotations("Explain 
plan: " + explainPlan, connection));
                         }
-                        PhoenixResultSet rs = newResultSet(resultIterator, 
plan.getProjector());
+                        StatementContext context = plan.getContext();
+                        context.getOverallQueryMetrics().startQuery();
+                        PhoenixResultSet rs = newResultSet(resultIterator, 
plan.getProjector(), context);
                         resultSets.add(rs);
                         setLastQueryPlan(plan);
                         setLastResultSet(rs);
@@ -272,7 +274,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                         // Regardless of whether the query was successfully 
handled or not, 
                         // update the time spent so far. If needed, we can 
separate out the
                         // success times and failure times.
-                        QUERY_TIME.update(System.currentTimeMillis() - 
startTime);
+                        GLOBAL_QUERY_TIME.update(System.currentTimeMillis() - 
startTime);
                     }
                 }
                 }, PhoenixContextExecutor.inContext());
@@ -288,7 +290,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                 SQLExceptionCode.READ_ONLY_CONNECTION).
                 build().buildException();
         }
-           MUTATION_COUNT.increment();
+           GLOBAL_MUTATION_SQL_COUNTER.increment();
         try {
             return CallRunner
                     .run(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java 
b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
index 31ef742..7406e46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
@@ -17,11 +17,11 @@
  */
 package org.apache.phoenix.job;
 
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.TASK_COUNT;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_END_TO_END_TIME;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_EXECUTION_TIME;
-import static 
org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_QUEUE_WAIT_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTED_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_QUEUE_WAIT_TIME;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -36,6 +36,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.annotation.Nullable;
+
+import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 /**
  * 
@@ -63,6 +67,7 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> 
{
 
     public static interface JobRunnable<T> extends Runnable {
         public Object getJobId();
+        public TaskExecutionMetricsHolder getTaskExecutionMetric();
     }
 
     public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int 
size, int queueSize, boolean useInstrumentedThreadPool) {
@@ -117,13 +122,17 @@ public class JobManager<T> extends 
AbstractRoundRobinQueue<T> {
      */
     static class JobFutureTask<T> extends FutureTask<T> {
         private final Object jobId;
+        @Nullable
+        private final TaskExecutionMetricsHolder taskMetric;
         
         public JobFutureTask(Runnable r, T t) {
             super(r, t);
             if(r instanceof JobRunnable){
                this.jobId = ((JobRunnable)r).getJobId();
+               this.taskMetric = ((JobRunnable)r).getTaskExecutionMetric();
             } else {
                this.jobId = this;
+               this.taskMetric = null;
             }
         }
         
@@ -132,8 +141,10 @@ public class JobManager<T> extends 
AbstractRoundRobinQueue<T> {
             // FIXME: this fails when executor used by hbase
             if (c instanceof JobCallable) {
                 this.jobId = ((JobCallable<T>) c).getJobId();
+                this.taskMetric = ((JobCallable<T>) 
c).getTaskExecutionMetric();
             } else {
                 this.jobId = this;
+                this.taskMetric = null;
             }
         }
         
@@ -187,6 +198,7 @@ public class JobManager<T> extends 
AbstractRoundRobinQueue<T> {
      */
     public static interface JobCallable<T> extends Callable<T> {
         public Object getJobId();
+        public TaskExecutionMetricsHolder getTaskExecutionMetric();
     }
 
 
@@ -224,27 +236,40 @@ public class JobManager<T> extends 
AbstractRoundRobinQueue<T> {
         private final RejectedExecutionHandler rejectedExecHandler = new 
RejectedExecutionHandler() {
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor 
executor) {
-                REJECTED_TASK_COUNT.increment();
+                TaskExecutionMetricsHolder metrics = getRequestMetric(r);
+                if (metrics != null) {
+                    metrics.getNumRejectedTasks().increment();
+                }
+                GLOBAL_REJECTED_TASK_COUNTER.increment();
                 throw new RejectedExecutionException("Task " + r.toString() + 
" rejected from " + executor.toString());
             }
         };
 
-        public InstrumentedThreadPoolExecutor(String threadPoolName, int 
corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
-                BlockingQueue<Runnable> workQueue, ThreadFactory 
threadFactory) {
+        public InstrumentedThreadPoolExecutor(String threadPoolName, int 
corePoolSize, int maximumPoolSize,
+                long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> 
workQueue, ThreadFactory threadFactory) {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
workQueue, threadFactory);
             setRejectedExecutionHandler(rejectedExecHandler);
         }
 
         @Override
         public void execute(Runnable task) {
-            TASK_COUNT.increment();
+            TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+            if (metrics != null) {
+                metrics.getNumTasks().increment();
+            }
+            GLOBAL_TASK_EXECUTED_COUNTER.increment();
             super.execute(task);
         }
 
         @Override
         protected void beforeExecute(Thread worker, Runnable task) {
             InstrumentedJobFutureTask instrumentedTask = 
(InstrumentedJobFutureTask)task;
-            TASK_QUEUE_WAIT_TIME.update(System.currentTimeMillis() - 
instrumentedTask.getTaskSubmissionTime());
+            long queueWaitTime = System.currentTimeMillis() - 
instrumentedTask.getTaskSubmissionTime();
+            GLOBAL_TASK_QUEUE_WAIT_TIME.update(queueWaitTime);
+            TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+            if (metrics != null) {
+                metrics.getTaskQueueWaitTime().change(queueWaitTime);
+            }
             super.beforeExecute(worker, instrumentedTask);
         }
 
@@ -254,10 +279,21 @@ public class JobManager<T> extends 
AbstractRoundRobinQueue<T> {
             try {
                 super.afterExecute(instrumentedTask, t);
             } finally {
-                TASK_EXECUTION_TIME.update(System.currentTimeMillis() - 
instrumentedTask.getTaskExecutionStartTime());
-                TASK_END_TO_END_TIME.update(System.currentTimeMillis() - 
instrumentedTask.getTaskSubmissionTime());
+                long taskExecutionTime = System.currentTimeMillis() - 
instrumentedTask.getTaskExecutionStartTime();
+                long endToEndTaskTime = System.currentTimeMillis() - 
instrumentedTask.getTaskSubmissionTime();
+                TaskExecutionMetricsHolder metrics = getRequestMetric(task);
+                if (metrics != null) {
+                    metrics.getTaskExecutionTime().change(taskExecutionTime);
+                    metrics.getTaskEndToEndTime().change(endToEndTaskTime);
+                }
+                GLOBAL_TASK_EXECUTION_TIME.update(taskExecutionTime);
+                GLOBAL_TASK_END_TO_END_TIME.update(endToEndTaskTime);
             }
         }
+
+        private static TaskExecutionMetricsHolder getRequestMetric(Runnable 
task) {
+            return ((JobFutureTask)task).taskMetric;
+        }
     }
 }
 

Reply via email to