PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b5607eef
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b5607eef
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b5607eef

Branch: refs/heads/4.x-HBase-0.98
Commit: b5607eeff5fc1648e4c98f3d01f29b82ae7be1bd
Parents: 75e8f70
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Mon May 14 14:10:44 2018 -0700
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Mon May 14 14:10:44 2018 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/QueryLoggerIT.java   |  74 ++++++----
 .../phoenix/monitoring/PhoenixMetricsIT.java    |  11 +-
 .../phoenix/compile/StatementContext.java       |   4 +-
 .../apache/phoenix/execute/MutationState.java   |   4 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  10 +-
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |  39 +++---
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  21 ++-
 .../java/org/apache/phoenix/log/LogLevel.java   |   2 +-
 .../java/org/apache/phoenix/log/LogWriter.java  |   6 +-
 .../org/apache/phoenix/log/QueryLogInfo.java    |  38 +++--
 .../org/apache/phoenix/log/QueryLogState.java   |  22 ---
 .../org/apache/phoenix/log/QueryLogger.java     |  74 +++++++---
 .../org/apache/phoenix/log/QueryLoggerUtil.java |  62 ++++-----
 .../org/apache/phoenix/log/QueryStatus.java     |  22 +++
 .../org/apache/phoenix/log/RingBufferEvent.java |  38 +++--
 .../phoenix/log/RingBufferEventTranslator.java  |  21 ++-
 .../org/apache/phoenix/log/TableLogWriter.java  | 139 ++++++++++++-------
 .../phoenix/mapreduce/PhoenixRecordReader.java  |  10 +-
 .../phoenix/monitoring/MemoryMetricsHolder.java |   1 -
 .../apache/phoenix/monitoring/MetricType.java   | 112 ++++++++++-----
 .../apache/phoenix/monitoring/MetricUtil.java   |  30 ++++
 .../phoenix/monitoring/MutationMetricQueue.java |  18 ++-
 .../phoenix/monitoring/OverAllQueryMetrics.java |  21 ++-
 .../phoenix/monitoring/ReadMetricQueue.java     |  23 +--
 .../monitoring/SpoolingMetricsHolder.java       |   3 +-
 .../monitoring/TaskExecutionMetricsHolder.java  |   4 +-
 .../query/ConnectionQueryServicesImpl.java      |   7 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   7 +-
 .../apache/phoenix/query/QueryConstants.java    | 124 ++---------------
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../phoenix/query/QueryServicesOptions.java     |   1 +
 .../apache/phoenix/schema/MetaDataClient.java   |   2 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |   7 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |   4 +
 .../iterate/SpoolingResultIteratorTest.java     |   7 +-
 35 files changed, 533 insertions(+), 436 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
index d6cc096..30d3222 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
@@ -26,11 +26,11 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITER
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -52,8 +52,10 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.log.LogLevel;
-import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryStatus;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
@@ -76,6 +78,19 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
     } 
     
+    private static class MyClock extends EnvironmentEdge {
+        public volatile long time;
+
+        public MyClock (long time) {
+            this.time = time;
+        }
+
+        @Override
+        public long currentTime() {
+            return time;
+        }
+    }
+    
 
     @Test
     public void testDebugLogs() throws Exception {
@@ -96,12 +111,13 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
         ResultSet explainRS = conn.createStatement().executeQuery("Explain " + 
query);
 
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
+
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -112,10 +128,9 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), 
context.getScan().toJSON());
                 assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
                 assertEquals(rs.getString(QUERY), query);
-                assertEquals(rs.getString(QUERY_STATUS), 
QueryLogState.COMPLETED.toString());
-                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(QUERY_STATUS), 
QueryStatus.COMPLETED.toString());
                 assertEquals(rs.getString(TENANT_ID), null);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
+                assertTrue(rs.getString(SCAN_METRICS_JSON)==null);
                 assertEquals(rs.getString(EXCEPTION_TRACE),null);
             }else{
                 //confirm we are not logging system queries
@@ -138,7 +153,10 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
         String query = "SELECT * FROM " + tableName;
         int count=100;
         for (int i = 0; i < count; i++) {
-            conn.createStatement().executeQuery(query);
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            while(rs.next()){
+                
+            }
         }
         
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
@@ -176,12 +194,12 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
         }
 
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -189,12 +207,10 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getString(CLIENT_IP), 
InetAddress.getLocalHost().getHostAddress());
                 assertEquals(rs.getString(EXPLAIN_PLAN), null);
                 assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null);
-                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
+                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
                 assertEquals(rs.getString(QUERY), query);
-                assertEquals(rs.getString(QUERY_STATUS),null);
-                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
+                
assertEquals(rs.getString(QUERY_STATUS),QueryStatus.COMPLETED.toString());
                 assertEquals(rs.getString(TENANT_ID), null);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null);
             }
         }
         assertTrue(foundQueryLog);
@@ -220,12 +236,12 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
         }
 
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -253,7 +269,9 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
         props.setProperty(QueryServices.LOG_LEVEL, loglevel.name());
         Connection conn = DriverManager.getConnection(getUrl(),props);
         
assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel);
-        
+        final MyClock clock = new MyClock(100);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try{
         String query = "SELECT * FROM " + tableName +" where V = ?";
         
         PreparedStatement pstmt = conn.prepareStatement(query);
@@ -268,12 +286,12 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
         ResultSet explainRS = conn.createStatement()
                 .executeQuery("Explain " + "SELECT * FROM " + tableName + " 
where V = 'value5'");
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
-        rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
-        
+
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
             if (rs.getString(QUERY_ID).equals(queryId)) {
                 foundQueryLog = true;
@@ -284,14 +302,16 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), 
context.getScan().toJSON());
                 assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1);
                 assertEquals(rs.getString(QUERY), query);
-                assertEquals(rs.getString(QUERY_STATUS), 
QueryLogState.COMPLETED.toString());
-                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(QUERY_STATUS), 
QueryStatus.COMPLETED.toString());
+                assertEquals(rs.getTimestamp(START_TIME).getTime(),100);
                 assertEquals(rs.getString(TENANT_ID), null);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
             }
         }
         assertTrue(foundQueryLog);
         conn.close();
+        }finally{
+            EnvironmentEdgeManager.injectEdge(null);
+        }
     }
     
     
@@ -313,14 +333,14 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
             assertEquals(e.getErrorCode(), 
SQLExceptionCode.TABLE_UNDEFINED.getErrorCode());
         }
         String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
-        ResultSet rs = conn.createStatement().executeQuery(logQuery);
-        boolean foundQueryLog = false;
         int delay = 5000;
 
         // sleep for sometime to let query log committed
         Thread.sleep(delay);
+        ResultSet rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
         while (rs.next()) {
-            if 
(QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
+            if (QueryStatus.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
                 foundQueryLog = true;
                 assertEquals(rs.getString(USER), 
System.getProperty("user.name"));
                 assertEquals(rs.getString(CLIENT_IP), 
InetAddress.getLocalHost().getHostAddress());
@@ -329,8 +349,6 @@ public class QueryLoggerIT extends 
BaseUniqueNamesOwnClusterIT {
                 assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
                 assertEquals(rs.getString(QUERY), query);
                 
assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage()));
-                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
-                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
             }
         }
         assertTrue(foundQueryLog);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 81428e6..4c5de79 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -60,11 +60,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixMetricsLog;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
+import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -94,6 +95,7 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
         // disable renewing leases as this will force spooling to happen.
         props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+        props.put(QueryServices.LOG_LEVEL, LogLevel.DEBUG.toString());
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
         // need the non-test driver for some tests that check number of 
hconnections, etc.
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
@@ -352,6 +354,7 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         insertRowsInTable(tableName, numRows);
         Properties props = new Properties();
         props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
"false");
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name());
         Connection conn = DriverManager.getConnection(getUrl(), props);
         ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + 
tableName);
         while (rs.next()) {}
@@ -681,7 +684,7 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
 
     private void changeInternalStateForTesting(PhoenixResultSet rs) {
         // get and set the internal state for testing purposes.
-        ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true);
+        ReadMetricQueue testMetricsQueue = new 
TestReadMetricsQueue(LogLevel.DEBUG);
         StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, 
"context");
         Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
         Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
@@ -754,8 +757,8 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
 
     private class TestReadMetricsQueue extends ReadMetricQueue {
 
-        public TestReadMetricsQueue(boolean isRequestMetricsEnabled) {
-            super(isRequestMetricsEnabled);
+        public TestReadMetricsQueue(LogLevel connectionLogLevel) {
+            super(connectionLogLevel);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 3ea5dd5..4358ee3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -135,8 +135,8 @@ public class StatementContext {
         this.dataColumns = this.currentTable == null ? Collections.<PColumn, 
Integer> emptyMap() : Maps
                 .<PColumn, Integer> newLinkedHashMap();
         this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
-        this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled);
-        this.overAllQueryMetrics = new 
OverAllQueryMetrics(isRequestMetricsEnabled);
+        this.readMetricsQueue = new ReadMetricQueue(connection.getLogLevel());
+        this.overAllQueryMetrics = new 
OverAllQueryMetrics(connection.getLogLevel());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0f995b3..1e75b75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -39,8 +39,8 @@ import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -1182,7 +1182,7 @@ public class MutationState implements SQLCloseable {
                                                numFailedMutations = 
uncommittedStatementIndexes.length;
                                                
GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
                     } finally {
-                       MutationMetric mutationsMetric = new 
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, 
numFailedMutations);
+                       MutationMetric mutationsMetric = new 
MutationMetric(connection.getLogLevel(),numMutations, mutationSizeBytes, 
mutationCommitTime, numFailedMutations);
                         
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), 
mutationsMetric);
                         try {
                             if (cache!=null) 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index bb3d447..43ad427 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -356,9 +356,10 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
                                 function.getTenantId())));
             }
         };
-        this.isRequestLevelMetricsEnabled = JDBCUtil
-                .isCollectingRequestLevelMetricsEnabled(url, info,
-                        this.services.getProps());
+        this.logLevel= 
LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
+                QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+        this.isRequestLevelMetricsEnabled = 
JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info,
+                this.services.getProps());
         this.mutationState = mutationState == null ? newMutationState(maxSize,
                 maxSizeBytes) : new MutationState(mutationState);
         this.metaData = metaData;
@@ -372,8 +373,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         this.scannerQueue = new LinkedBlockingQueue<>();
         this.tableResultIteratorFactory = new 
DefaultTableResultIteratorFactory();
         this.isRunningUpgrade = isRunningUpgrade;
-        this.logLevel= 
LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
-                QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+        
         this.logSamplingRate = 
Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
                 QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
         GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 909b59d..1bc47db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -51,8 +51,8 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.log.QueryLogInfo;
-import org.apache.phoenix.log.QueryLogState;
 import org.apache.phoenix.log.QueryLogger;
+import org.apache.phoenix.log.QueryStatus;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.OverAllQueryMetrics;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -76,8 +76,6 @@ import org.apache.phoenix.util.SQLCloseable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
 
 
 
@@ -133,9 +131,9 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
 
     private Long count = 0L;
 
-    private QueryLogState logStatus = QueryLogState.COMPLETED;
+    private Object exception;
+
 
-    private RuntimeException exception;
     
     public PhoenixResultSet(ResultIterator resultIterator, RowProjector 
rowProjector, StatementContext ctx) throws SQLException {
         this.rowProjector = rowProjector;
@@ -144,7 +142,7 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
         this.statement = context.getStatement();
         this.readMetricsQueue = context.getReadMetricsQueue();
         this.overAllQueryMetrics = context.getOverallQueryMetrics();
-        this.queryLogger = context.getQueryLogger();
+        this.queryLogger = context.getQueryLogger() != null ? 
context.getQueryLogger() : QueryLogger.NO_OP_INSTANCE;
     }
     
     @Override
@@ -181,6 +179,14 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
             statement.getResultSets().remove(this);
             overAllQueryMetrics.endQuery();
             overAllQueryMetrics.stopResultSetWatch();
+            if (!queryLogger.isSynced()) {
+                if(this.exception==null){
+                    
queryLogger.log(QueryLogInfo.QUERY_STATUS_I,QueryStatus.COMPLETED.toString());
+                }
+                queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
+                // if not already synced , like closing before result set 
exhausted
+                queryLogger.sync(getReadMetrics(), 
getOverAllRequestReadMetrics());
+            }
         }
     }
 
@@ -799,27 +805,22 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
             }
             rowProjector.reset();
         } catch (RuntimeException e) {
-            this.logStatus=QueryLogState.FAILED;
             // FIXME: Expression.evaluate does not throw SQLException
             // so this will unwrap throws from that.
+            queryLogger.log(QueryLogInfo.QUERY_STATUS_I, 
QueryStatus.FAILED.toString());
+            if (queryLogger.isDebugEnabled()) {
+                queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, 
Throwables.getStackTraceAsString(e));
+            }
             this.exception = e;
             if (e.getCause() instanceof SQLException) {
                 throw (SQLException) e.getCause();
             }
             throw e;
         }finally{
-            if (currentRow == null && queryLogger != null ) {
-                if (queryLogger.isDebugEnabled()) {
-                    Builder<QueryLogInfo, Object> queryLogBuilder = 
ImmutableMap.builder();
-                    queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, 
count);
-                    queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
-                            System.currentTimeMillis() - 
queryLogger.getStartTime());
-                    
-                    if (this.exception != null) {
-                        queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I,
-                                
Throwables.getStackTraceAsString(this.exception));
-                    }
-                    queryLogger.log(logStatus, queryLogBuilder.build());
+            if (this.exception != null) {
+                queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count);
+                if (queryLogger != null) {
+                    queryLogger.sync(getReadMetrics(), 
getOverAllRequestReadMetrics());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 29edb7f..5d58688 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -93,7 +93,7 @@ import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.log.QueryLogInfo;
-import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryStatus;
 import org.apache.phoenix.log.QueryLogger;
 import org.apache.phoenix.log.QueryLoggerUtil;
 import org.apache.phoenix.optimize.Cost;
@@ -188,8 +188,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.math.IntMath;
@@ -316,10 +314,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                         StatementContext context = plan.getContext();
                         context.setQueryLogger(queryLogger);
                         if(queryLogger.isDebugEnabled()){
-                            Builder<QueryLogInfo, Object> queryLogBuilder = 
ImmutableMap.builder();
-                            queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, 
QueryUtil.getExplainPlan(resultIterator));
-                            
queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, 
context.getScan()!=null?context.getScan().toString():null);
-                            queryLogger.log(QueryLogState.COMPILED, 
queryLogBuilder.build());
+                            queryLogger.log(QueryLogInfo.EXPLAIN_PLAN_I, 
QueryUtil.getExplainPlan(resultIterator));
+                            
queryLogger.log(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, 
context.getScan()!=null?context.getScan().toString():null);
                         }
                         context.getOverallQueryMetrics().startQuery();
                         PhoenixResultSet rs = newResultSet(resultIterator, 
plan.getProjector(), plan.getContext());
@@ -348,6 +344,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                         }
                         throw e;
                     }catch (RuntimeException e) {
+                        
                         // FIXME: Expression.evaluate does not throw 
SQLException
                         // so this will unwrap throws from that.
                         if (e.getCause() instanceof SQLException) {
@@ -364,11 +361,9 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 }, PhoenixContextExecutor.inContext());
         }catch (Exception e) {
             if (queryLogger.isDebugEnabled()) {
-                Builder<QueryLogInfo, Object> queryLogBuilder = 
ImmutableMap.builder();
-                queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
-                        System.currentTimeMillis() - 
queryLogger.getStartTime());
-                queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, 
Throwables.getStackTraceAsString(e));
-                queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build());
+                queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, 
Throwables.getStackTraceAsString(e));
+                queryLogger.log(QueryLogInfo.QUERY_STATUS_I, 
QueryStatus.FAILED.toString());
+                queryLogger.sync(null, null);
             }
             Throwables.propagateIfInstanceOf(e, SQLException.class);
             Throwables.propagate(e);
@@ -1695,7 +1690,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         }
         QueryLogger queryLogger = 
QueryLogger.getInstance(connection,isSystemTable);
         QueryLoggerUtil.logInitialDetails(queryLogger, 
connection.getTenantId(),
-                connection.getQueryServices(), sql, 
queryLogger.getStartTime(), getParameters());
+                connection.getQueryServices(), sql, getParameters());
         return queryLogger;
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
index 5792658..269b4f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
@@ -18,5 +18,5 @@
 package org.apache.phoenix.log;
 
 public enum LogLevel {
-    OFF, INFO, DEBUG, TRACE
+    OFF,INFO, DEBUG, TRACE
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
index 817f9ec..dab03e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
@@ -31,16 +31,18 @@ public interface LogWriter {
      * @param event
      * @throws SQLException
      * @throws IOException
+     * @throws ClassNotFoundException 
      */
-    void write(RingBufferEvent event) throws SQLException, IOException;
+    void write(RingBufferEvent event) throws SQLException, IOException, 
ClassNotFoundException;
 
     /**
      * will be called when disruptor is getting shutdown
      * 
      * @throws IOException
+     * @throws SQLException 
      */
 
-    void close() throws IOException;
+    void close() throws IOException, SQLException;
 
     /**
      * if writer is closed and cannot write further event

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
index 87de267..fb38ba2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
@@ -28,8 +28,8 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
 
 import org.apache.phoenix.schema.types.PDataType;
@@ -40,29 +40,27 @@ import org.apache.phoenix.schema.types.PVarchar;
 
 public enum QueryLogInfo {
     
-    CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, 
PVarchar.INSTANCE),
-    QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
-    BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, 
LogLevel.TRACE,PVarchar.INSTANCE),
-    QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, 
LogLevel.INFO,PVarchar.INSTANCE),
-    TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, 
LogLevel.INFO,PVarchar.INSTANCE),
-    START_TIME_I(START_TIME,QueryLogState.STARTED, 
LogLevel.INFO,PTimestamp.INSTANCE),
-    USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
-    EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
-    GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
-    NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PLong.INSTANCE),
-    EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
-    QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
-    TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PLong.INSTANCE),
-    SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PVarchar.INSTANCE);
+    CLIENT_IP_I(CLIENT_IP, LogLevel.INFO, PVarchar.INSTANCE),
+    QUERY_I(QUERY, LogLevel.INFO,PVarchar.INSTANCE),
+    BIND_PARAMETERS_I(BIND_PARAMETERS, LogLevel.TRACE,PVarchar.INSTANCE),
+    QUERY_ID_I(QUERY_ID, LogLevel.INFO,PVarchar.INSTANCE),
+    TENANT_ID_I(TENANT_ID, LogLevel.INFO,PVarchar.INSTANCE),
+    START_TIME_I(START_TIME, LogLevel.INFO,PTimestamp.INSTANCE),
+    USER_I(USER, LogLevel.INFO,PVarchar.INSTANCE),
+    EXPLAIN_PLAN_I(EXPLAIN_PLAN,LogLevel.DEBUG,PVarchar.INSTANCE),
+    GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS, 
LogLevel.DEBUG,PVarchar.INSTANCE),
+    NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED, 
LogLevel.INFO,PLong.INSTANCE),
+    EXCEPTION_TRACE_I(EXCEPTION_TRACE, LogLevel.DEBUG,PVarchar.INSTANCE),
+    QUERY_STATUS_I(QUERY_STATUS, LogLevel.INFO,PVarchar.INSTANCE),
+    SCAN_METRICS_JSON_I(SCAN_METRICS_JSON, LogLevel.TRACE,PVarchar.INSTANCE), 
+    TABLE_NAME_I(TABLE_NAME, LogLevel.DEBUG,PVarchar.INSTANCE);
     
     public final String columnName;
-    public final QueryLogState logState;
     public final LogLevel logLevel;
     public final PDataType dataType;
 
-    private QueryLogInfo(String columnName, QueryLogState logState, LogLevel 
logLevel, PDataType dataType) {
+    private QueryLogInfo(String columnName, LogLevel logLevel, PDataType 
dataType) {
         this.columnName = columnName;
-        this.logState=logState;
         this.logLevel=logLevel;
         this.dataType=dataType;
     }
@@ -71,10 +69,6 @@ public enum QueryLogInfo {
         return columnName;
     }
 
-    public QueryLogState getLogState() {
-        return logState;
-    }
-
     public LogLevel getLogLevel() {
         return logLevel;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
deleted file mode 100644
index e27f0e8..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.log;
-
-public enum QueryLogState {
-    STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED 
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
index b2fb235..ef5559c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
@@ -17,13 +17,17 @@
  */
 package org.apache.phoenix.log;
 
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 import io.netty.util.internal.ThreadLocalRandom;
 
@@ -34,15 +38,17 @@ public class QueryLogger {
     private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator 
= new ThreadLocal<>();
     private QueryLoggerDisruptor queryDisruptor;
     private String queryId;
-    private Long startTime;
     private LogLevel logLevel;
+    private Builder<QueryLogInfo, Object> queryLogBuilder = 
ImmutableMap.builder();
+    private boolean isSynced;
     private static final Log LOG = 
LogFactory.getLog(QueryLoggerDisruptor.class);
     
     private QueryLogger(PhoenixConnection connection) {
         this.queryId = UUID.randomUUID().toString();
         this.queryDisruptor = 
connection.getQueryServices().getQueryDisruptor();
-        this.startTime = System.currentTimeMillis();
         logLevel = connection.getLogLevel();
+        log(QueryLogInfo.QUERY_ID_I, queryId);
+        log(QueryLogInfo.START_TIME_I, 
EnvironmentEdgeManager.currentTimeMillis());
     }
     
     private QueryLogger() {
@@ -58,21 +64,32 @@ public class QueryLogger {
         return result;
     }
     
-    private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
+    public static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
         @Override
-        public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, 
Object> map) {
+        public void log(QueryLogInfo queryLogInfo, Object info) {
 
         }
-        
+
         @Override
-        public boolean isDebugEnabled(){
+        public boolean isDebugEnabled() {
             return false;
         }
-        
+
         @Override
-        public boolean isInfoEnabled(){
+        public boolean isInfoEnabled() {
             return false;
         }
+
+        @Override
+        public void sync(
+                Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics) {
+
+        }
+        
+        @Override
+        public boolean isSynced(){
+            return true;
+        }
     };
 
     public static QueryLogger getInstance(PhoenixConnection connection, 
boolean isSystemTable) {
@@ -82,14 +99,14 @@ public class QueryLogger {
     }
 
     /**
-     * Add query log in the table, columns will be logged depending upon the 
connection logLevel
-     * @param logState State of the query
-     * @param map Value of the map should be in format of the corresponding 
data type 
+     * Add query log in the table, columns will be logged depending upon the 
connection logLevel 
      */
-    public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> 
map) {
-        final RingBufferEventTranslator translator = getCachedTranslator();
-        translator.setQueryInfo(logState, map, logLevel);
-        publishLogs(translator);
+    public void log(QueryLogInfo queryLogInfo, Object info) {
+        try {
+            queryLogBuilder.put(queryLogInfo, info);
+        } catch (Exception e) {
+            LOG.warn("Unable to add log info because of " + e.getMessage());
+        }
     }
     
     private boolean publishLogs(RingBufferEventTranslator translator) {
@@ -102,13 +119,6 @@ public class QueryLogger {
     }
 
     /**
-     * Start time when the logger was started, if {@link LogLevel#OFF} then 
it's the current time
-     */
-    public Long getStartTime() {
-        return startTime != null ? startTime : System.currentTimeMillis();
-    }
-    
-    /**
      *  Is debug logging currently enabled?
      *  Call this method to prevent having to perform expensive operations 
(for example, String concatenation) when the log level is more than debug.
      */
@@ -117,7 +127,8 @@ public class QueryLogger {
     }
     
     private boolean isLevelEnabled(LogLevel logLevel){
-        return this.logLevel != null ? logLevel.ordinal() <= 
this.logLevel.ordinal() : false;
+        return this.logLevel != null && logLevel != LogLevel.OFF ? 
logLevel.ordinal() <= this.logLevel.ordinal()
+                : false;
     }
     
     /**
@@ -142,4 +153,21 @@ public class QueryLogger {
         return this.queryId;
     }
     
+
+    public void sync(Map<String, Map<MetricType, Long>> readMetrics, 
Map<MetricType, Long> overAllMetrics) {
+        if (!isSynced) {
+            isSynced = true;
+            final RingBufferEventTranslator translator = getCachedTranslator();
+            translator.setQueryInfo(logLevel, queryLogBuilder.build(), 
readMetrics, overAllMetrics);
+            publishLogs(translator);
+        }
+    }
+    
+    /**
+     * Is Synced already
+     */
+    public boolean isSynced(){
+        return this.isSynced;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
index d5c4878..21917ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
@@ -25,48 +25,36 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.PName;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
-
 public class QueryLoggerUtil {
 
+
     public static void logInitialDetails(QueryLogger queryLogger, PName 
tenantId, ConnectionQueryServices queryServices,
-            String query, long startTime, List<Object> bindParameters) {
+            String query, List<Object> bindParameters) {
         try {
-            queryLogger.log(QueryLogState.STARTED,
-                    getInitialDetails(tenantId, queryServices, query, 
startTime, bindParameters));
+            String clientIP;
+            try {
+                clientIP = InetAddress.getLocalHost().getHostAddress();
+            } catch (UnknownHostException e) {
+                clientIP = "UnknownHost";
+            }
+
+            if (clientIP != null) {
+                queryLogger.log(QueryLogInfo.CLIENT_IP_I, clientIP);
+            }
+            if (query != null) {
+                queryLogger.log(QueryLogInfo.QUERY_I, query);
+            }
+            if (bindParameters != null) {
+                queryLogger.log(QueryLogInfo.BIND_PARAMETERS_I, 
StringUtils.join(bindParameters, ","));
+            }
+            if (tenantId != null) {
+                queryLogger.log(QueryLogInfo.TENANT_ID_I, 
tenantId.getString());
+            }
+
+            queryLogger.log(QueryLogInfo.USER_I, queryServices.getUserName() 
!= null ? queryServices.getUserName()
+                    : queryServices.getUser().getShortName());
         } catch (Exception e) {
-            // Ignore for now
-        }
-
-    }
-
-    private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName 
tenantId,
-            ConnectionQueryServices queryServices, String query, long 
startTime, List<Object> bindParameters) {
-        Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
-        String clientIP;
-        try {
-            clientIP = InetAddress.getLocalHost().getHostAddress();
-        } catch (UnknownHostException e) {
-            clientIP = "UnknownHost";
+            // Ignore
         }
-
-        if (clientIP != null) {
-            queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP);
-        }
-        if (query != null) {
-            queryLogBuilder.put(QueryLogInfo.QUERY_I, query);
-        }
-        queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime);
-        if (bindParameters != null) {
-            queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, 
StringUtils.join(bindParameters, ","));
-        }
-        if (tenantId != null) {
-            queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, 
tenantId.getString());
-        }
-
-        queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() 
!= null ? queryServices.getUserName()
-                : queryServices.getUser().getShortName());
-        return queryLogBuilder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
new file mode 100644
index 0000000..0e634c1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+public enum QueryStatus {
+    COMPILED, COMPLETED,FAILED 
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
index 96e4bf9..8854e68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
@@ -17,14 +17,19 @@
  */
 package org.apache.phoenix.log;
 
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.MetricType;
+
 import com.google.common.collect.ImmutableMap;
 import com.lmax.disruptor.EventFactory;
 
  class RingBufferEvent {
     private String queryId;
-    private QueryLogState logState;
     private LogLevel connectionLogLevel;
     private ImmutableMap<QueryLogInfo, Object> queryInfo;
+    private Map<String, Map<MetricType, Long>> readMetrics;
+    private Map<MetricType, Long> overAllMetrics;
     
     public static final Factory FACTORY = new Factory();
     
@@ -40,7 +45,6 @@ import com.lmax.disruptor.EventFactory;
     }
 
     public void clear() {
-        this.logState=null;
         this.queryInfo=null;
         this.queryId=null;
     }
@@ -53,10 +57,6 @@ import com.lmax.disruptor.EventFactory;
     public static Factory getFactory() {
         return FACTORY;
     }
-    
-    public QueryLogState getLogState() {
-        return logState;
-    }
 
     public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) {
         this.queryInfo=queryInfo;
@@ -73,12 +73,6 @@ import com.lmax.disruptor.EventFactory;
         
     }
 
-    public void setLogState(QueryLogState logState) {
-        this.logState=logState;
-        
-    }
-
-
     public LogLevel getConnectionLogLevel() {
         return connectionLogLevel;
     }
@@ -88,6 +82,26 @@ import com.lmax.disruptor.EventFactory;
         this.connectionLogLevel = connectionLogLevel;
     }
 
+
+    public Map<String, Map<MetricType, Long>> getReadMetrics() {
+        return readMetrics;
+    }
+
+
+    public void setReadMetrics(Map<String, Map<MetricType, Long>> readMetrics) 
{
+        this.readMetrics = readMetrics;
+    }
+
+
+    public Map<MetricType, Long> getOverAllMetrics() {
+        return overAllMetrics;
+    }
+
+
+    public void setOverAllMetrics(Map<MetricType, Long> overAllMetrics) {
+        this.overAllMetrics = overAllMetrics;
+    }
+
     
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
index 653ddd6..742f8e1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
@@ -17,14 +17,19 @@
  */
 package org.apache.phoenix.log;
 
+import java.util.Map;
+
+import org.apache.phoenix.monitoring.MetricType;
+
 import com.google.common.collect.ImmutableMap;
 import com.lmax.disruptor.EventTranslator;
 
 class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> {
     private String queryId;
-    private QueryLogState logState;
     private ImmutableMap<QueryLogInfo, Object> queryInfo;
     private LogLevel connectionLogLevel;
+    private Map<String, Map<MetricType, Long>> readMetrics;
+    private Map<MetricType, Long> overAllMetrics;
     
     public RingBufferEventTranslator(String queryId) {
         this.queryId=queryId;
@@ -34,20 +39,22 @@ class RingBufferEventTranslator implements 
EventTranslator<RingBufferEvent> {
     public void translateTo(RingBufferEvent event, long sequence) {
         event.setQueryId(queryId);
         event.setQueryInfo(queryInfo);
-        event.setLogState(logState);
+        event.setReadMetrics(readMetrics);
+        event.setOverAllMetrics(overAllMetrics);
         event.setConnectionLogLevel(connectionLogLevel);
         clear();
     }
 
     private void clear() {
-        setQueryInfo(null,null,null);
+        setQueryInfo(null,null,null,null);
     }
    
-    public void setQueryInfo(QueryLogState logState, 
ImmutableMap<QueryLogInfo, Object> queryInfo,
-            LogLevel connectionLogLevel) {
+    public void setQueryInfo(LogLevel logLevel, ImmutableMap<QueryLogInfo, 
Object> queryInfo, Map<String, Map<MetricType, Long>> readMetrics,
+            Map<MetricType, Long> overAllMetrics) {
         this.queryInfo = queryInfo;
-        this.logState = logState;
-        this.connectionLogLevel = connectionLogLevel;
+        this.connectionLogLevel = logLevel;
+        this.readMetrics = readMetrics;
+        this.overAllMetrics=overAllMetrics;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
index 4398596..4969484 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
@@ -21,23 +21,17 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCH
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
 
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.Map.Entry;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.expression.Determinism;
-import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.query.HBaseFactoryProvider;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.util.QueryUtil;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -47,58 +41,97 @@ import com.google.common.collect.ImmutableMap;
  */
 public class TableLogWriter implements LogWriter {
     private static final Log LOG = LogFactory.getLog(LogWriter.class);
-    private HConnection connection;
+    private Connection connection;
     private boolean isClosed;
-    private HTableInterface table;
     private Configuration config;
+    private PreparedStatement upsertStatement;
+    private Map<MetricType,Integer> metricOrdinals=new 
HashMap<MetricType,Integer>();
 
     public TableLogWriter(Configuration configuration) {
-        this.config = configuration;
-        try {
-            this.connection = 
HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
-            table = HBaseFactoryProvider.getHTableFactory()
-                    .getTable(SchemaUtil
-                            .getPhysicalTableName(
-                                    
SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config)
-                            .getName(), connection, null);
-        } catch (Exception e) {
-            LOG.warn("Unable to initiate LogWriter for writing query logs to 
table");
+        this.config=configuration;
+    }
+
+    private PreparedStatement buildUpsertStatement(Connection conn) throws 
SQLException {
+        StringBuilder buf = new StringBuilder("UPSERT INTO " + 
SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(");
+        int queryLogEntries=0;
+        for (QueryLogInfo info : QueryLogInfo.values()) {
+            buf.append(info.columnName);
+            buf.append(',');
+            queryLogEntries++;
+        }
+        for (MetricType metric : MetricType.values()) {
+            if (metric.logLevel() != LogLevel.OFF) {
+                metricOrdinals.put(metric, ++queryLogEntries);
+                buf.append(metric.columnName());
+                buf.append(',');
+            }
         }
+        buf.setLength(buf.length()-1);
+        buf.append(") VALUES (");
+        for (int i = 0; i < QueryLogInfo.values().length; i++) {
+            buf.append("?,");
+        }
+        for (MetricType metric : MetricType.values()) {
+            if (metric.logLevel() != LogLevel.OFF) {
+                buf.append("?,");
+            }
+        }
+        buf.setLength(buf.length()-1);
+        buf.append(")");
+        return conn.prepareStatement(buf.toString());
     }
 
     @Override
-    public void write(RingBufferEvent event) throws SQLException, IOException {
+    public void write(RingBufferEvent event) throws SQLException, IOException, 
ClassNotFoundException {
         if(isClosed()){
             LOG.warn("Unable to commit query log as Log committer is already 
closed");
             return;
         }
-        if (table == null || connection == null) {
-            LOG.warn("Unable to commit query log as connection was not 
initiated ");
-            return;
+        if (connection == null) {
+            synchronized (this) {
+                if (connection == null) {
+                    connection = 
QueryUtil.getConnectionForQueryLog(this.config);
+                    this.upsertStatement = buildUpsertStatement(connection);
+                }
+            }
         }
-        ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo();
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        Put put =new Put(Bytes.toBytes(event.getQueryId()));
-        for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) {
-            if (entry.getKey().logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()) {
-                LiteralExpression expression = 
LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType,
-                        Determinism.ALWAYS);
-                expression.evaluate(null, ptr);
-                put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes(entry.getKey().columnName),
-                        ByteUtil.copyKeyBytesIfNecessary(ptr));
+
+        ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo();
+        for (QueryLogInfo info : QueryLogInfo.values()) {
+            if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()) {
+                upsertStatement.setObject(info.ordinal() + 1, 
queryInfoMap.get(info));
+            } else {
+                upsertStatement.setObject(info.ordinal() + 1, null);
+             }
+         }
+        Map<MetricType, Long> overAllMetrics = event.getOverAllMetrics();
+        Map<String, Map<MetricType, Long>> readMetrics = 
event.getReadMetrics();
+
+        for (MetricType metric : MetricType.values()) {
+            if (overAllMetrics != null && overAllMetrics.containsKey(metric)
+                    && metric.isLoggingEnabled(event.getConnectionLogLevel())) 
{
+                upsertStatement.setObject(metricOrdinals.get(metric), 
overAllMetrics.get(metric));
+            } else {
+                if (metric.logLevel() != LogLevel.OFF) {
+                    upsertStatement.setObject(metricOrdinals.get(metric), 
null);
+                }
             }
         }
-        
-        if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()
-                && (event.getLogState() == QueryLogState.COMPLETED || 
event.getLogState() == QueryLogState.FAILED)) {
-            LiteralExpression expression = 
LiteralExpression.newConstant(event.getLogState().toString(),
-                    QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS);
-            expression.evaluate(null, ptr);
-            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                    Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+
+        if (readMetrics != null && !readMetrics.isEmpty()) {
+            for (Map.Entry<String, Map<MetricType, Long>> entry : 
readMetrics.entrySet()) {
+                upsertStatement.setObject(QueryLogInfo.TABLE_NAME_I.ordinal() 
+ 1, entry.getKey());
+                for (MetricType metric : entry.getValue().keySet()) {
+                    if 
(metric.isLoggingEnabled(event.getConnectionLogLevel())) {
+                        upsertStatement.setObject(metricOrdinals.get(metric), 
entry.getValue().get(metric));
+                    }
+                }
+                upsertStatement.executeUpdate();
+            }
+        } else {
+            upsertStatement.executeUpdate();
         }
-        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
-        table.put(put);
+        connection.commit();
         
     }
     
@@ -109,18 +142,16 @@ public class TableLogWriter implements LogWriter {
         }
         isClosed=true;
         try {
-            if (table != null) {
-                table.close();
-            }
-            if (connection != null && !connection.isClosed()) {
-                //It should internally close all the statements
+            if (connection != null) {
+                // It should internally close all the statements
                 connection.close();
             }
-        } catch (IOException e) {
+        } catch (SQLException e) {
             // TODO Ignore?
         }
     }
     
+    @Override
     public boolean isClosed(){
         return isClosed;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index eafff99..4fed458 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -37,7 +37,15 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.*;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.iterate.TableSnapshotResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.monitoring.ReadMetricQueue;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
index 0e82ce4..daa0bba 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java
@@ -26,7 +26,6 @@ import static 
org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
 public class MemoryMetricsHolder {
     private final CombinableMetric memoryChunkSizeMetric;
     private final CombinableMetric memoryWaitTimeMetric;
-    public static final MemoryMetricsHolder NO_OP_INSTANCE = new 
MemoryMetricsHolder(new ReadMetricQueue(false), null);
     
     public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
         this.memoryChunkSizeMetric = 
readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 45295a7..4582c5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -16,51 +16,62 @@
  * limitations under the License.
  */
 package org.apache.phoenix.monitoring;
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
 
+
+/**
+ * Keeping {@link LogLevel#OFF} for metrics which are calculated globally only 
and doesn't need to be logged in SYSTEM.LOG
+ */
 public enum MetricType {
 
-       NO_OP_METRIC("no", "No op metric"),
-       // mutation (write) related metrics 
-    MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch"),
-    MUTATION_BYTES("mb", "Size of mutations in bytes"),
-    MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations"),
-    MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be 
committed"),
-    MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql 
statements"),
-    // query (read) related metrics
-    QUERY_TIME("qt", "Query times"),
-    QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out"),
-    QUERY_FAILED_COUNTER("qf", "Number of times query failed"),
-    NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel"),
-    SCAN_BYTES("sb", "Number of bytes read by scans"),
-    SELECT_SQL_COUNTER("sc", "Counter for number of sql queries"),
-    // task metrics
-    TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the 
queue of the thread pool executor"),
-    TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from 
creation to completion"),
-    TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute"),
-    TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the 
thread pool executor"),
-    TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were 
rejected by the thread pool executor"),
-    // spool metrics
-    SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes"),
-    SPOOL_FILE_COUNTER("sn", "Number of spool files created"),
-    // misc metrics
-    MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory 
manager"),
-    MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for 
memory to be allocated through memory manager"),
-    CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed 
because of splits"),
-    WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query 
execution"),
-    RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records 
using resultSet.next()"),
-    OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix 
connections"),
-    QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl 
instantiated"),
-    HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix 
driver"),
-    PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix 
connections prevented from opening " +
-                                              "because there are already too 
many to that target cluster."),
-    PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix 
connections, whether successful or not.");
+    NO_OP_METRIC("no", "No op metric",LogLevel.OFF, PLong.INSTANCE),
+    // mutation (write) related metrics
+ MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch",LogLevel.OFF, 
PLong.INSTANCE),
+ MUTATION_BYTES("mb", "Size of mutations in bytes",LogLevel.OFF, 
PLong.INSTANCE),
+ MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of 
mutations",LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be 
committed",LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql 
statements",LogLevel.OFF, PLong.INSTANCE),
+ // query (read) related metrics
+ QUERY_TIME("qt", "Query times",LogLevel.OFF, PLong.INSTANCE),
+ QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out",LogLevel.DEBUG, 
PLong.INSTANCE),
+ QUERY_FAILED_COUNTER("qf", "Number of times query failed",LogLevel.DEBUG, 
PLong.INSTANCE),
+ NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in 
parallel",LogLevel.DEBUG, PLong.INSTANCE),
+ SCAN_BYTES("sb", "Number of bytes read by scans",LogLevel.OFF, 
PLong.INSTANCE),
+ SELECT_SQL_COUNTER("sc", "Counter for number of sql queries",LogLevel.OFF, 
PLong.INSTANCE),
+ // task metrics
+ TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the 
queue of the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from 
creation to completion",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to 
execute",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the 
thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
+ TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected 
by the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE),
+ // spool metrics
+ SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes",LogLevel.DEBUG, 
PLong.INSTANCE),
+ SPOOL_FILE_COUNTER("sn", "Number of spool files created",LogLevel.DEBUG, 
PLong.INSTANCE),
+ // misc metrics
+ MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory 
manager",LogLevel.DEBUG, PLong.INSTANCE),
+ MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for 
memory to be allocated through memory manager",LogLevel.DEBUG, PLong.INSTANCE),
+ CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed 
because of splits",LogLevel.DEBUG, PLong.INSTANCE),
+ WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query 
execution",LogLevel.INFO, PLong.INSTANCE),
+ RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records 
using resultSet.next()",LogLevel.INFO, PLong.INSTANCE),
+ OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix 
connections",LogLevel.OFF, PLong.INSTANCE),
+ QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl 
instantiated",LogLevel.OFF, PLong.INSTANCE),
+ HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix 
driver",LogLevel.OFF, PLong.INSTANCE),
+ PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix 
connections prevented from opening " +
+                                           "because there are already too many 
to that target cluster.",LogLevel.OFF, PLong.INSTANCE),
+ PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix 
connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE);
     
     private final String description;
     private final String shortName;
+    private LogLevel logLevel;
+    private PDataType dataType;
 
-    private MetricType(String shortName, String description) {
+    private MetricType(String shortName, String description, LogLevel 
logLevel, PDataType dataType) {
        this.shortName = shortName;
         this.description = description;
+        this.logLevel=logLevel;
+        this.dataType=dataType;
     }
 
     public String description() {
@@ -70,5 +81,34 @@ public enum MetricType {
     public String shortName() {
         return shortName;
     }
+    
+    public LogLevel logLevel() {
+        return logLevel;
+    }
+    
+    public PDataType dataType() {
+        return dataType;
+    }
+    
+    public String columnName() {
+        return name();
+    }
+    
+    public boolean isLoggingEnabled(LogLevel connectionLogLevel){
+        return logLevel() != LogLevel.OFF && (logLevel().ordinal() <= 
connectionLogLevel.ordinal());
+    }
 
+    public static String getMetricColumnsDetails() {
+        StringBuilder buffer=new StringBuilder();
+        for(MetricType metric:MetricType.values()){
+            if (metric.logLevel() != LogLevel.OFF) {
+                buffer.append(metric.columnName());
+                buffer.append(" ");
+                buffer.append(metric.dataType.getSqlTypeName());
+                buffer.append(",");
+            }
+        }
+        return buffer.toString();
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
new file mode 100644
index 0000000..1e5ac08
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+
+public class MetricUtil {
+
+    public static CombinableMetric getCombinableMetric(LogLevel 
connectionLogLevel, MetricType type) {
+        if (!type.isLoggingEnabled(connectionLogLevel)) { return 
NoOpRequestMetric.INSTANCE; }
+        return new CombinableMetricImpl(type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index 3de2be1..1256f5c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.phoenix.log.LogLevel;
+
 /**
  * Queue that tracks various writes/mutations related phoenix request metrics.
  */
@@ -81,12 +83,16 @@ public class MutationMetricQueue {
      * Class that holds together the various metrics associated with mutations.
      */
     public static class MutationMetric {
-        private final CombinableMetric numMutations = new 
CombinableMetricImpl(MUTATION_BATCH_SIZE);
-        private final CombinableMetric mutationsSizeBytes = new 
CombinableMetricImpl(MUTATION_BYTES);
-        private final CombinableMetric totalCommitTimeForMutations = new 
CombinableMetricImpl(MUTATION_COMMIT_TIME);
-        private final CombinableMetric numFailedMutations = new 
CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);
-
-        public MutationMetric(long numMutations, long mutationsSizeBytes, long 
commitTimeForMutations, long numFailedMutations) {
+        private final CombinableMetric numMutations;;
+        private final CombinableMetric mutationsSizeBytes;
+        private final CombinableMetric totalCommitTimeForMutations;
+        private final CombinableMetric numFailedMutations;
+
+        public MutationMetric(LogLevel connectionLogLevel, long numMutations, 
long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) {
+            this.numMutations = 
MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_SIZE);
+            this.mutationsSizeBytes 
=MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BYTES);
+            this.totalCommitTimeForMutations 
=MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_COMMIT_TIME);
+            this.numFailedMutations = 
MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_FAILED_SIZE);
             this.numMutations.change(numMutations);
             this.mutationsSizeBytes.change(mutationsSizeBytes);
             this.totalCommitTimeForMutations.change(commitTimeForMutations);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
index b995267..3121ecd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java
@@ -27,7 +27,7 @@ import static 
org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
+import org.apache.phoenix.log.LogLevel;
 
 /**
  * Class that represents the overall metrics associated with a query being 
executed by the phoenix.
@@ -42,16 +42,15 @@ public class OverAllQueryMetrics {
     private final CombinableMetric queryFailed;
     private final CombinableMetric cacheRefreshedDueToSplits;
 
-    public OverAllQueryMetrics(boolean isMetricsEnabled) {
-        queryWatch = new MetricsStopWatch(isMetricsEnabled);
-        resultSetWatch = new MetricsStopWatch(isMetricsEnabled);
-        numParallelScans = isMetricsEnabled ? new 
CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE;
-        wallClockTimeMS = isMetricsEnabled ? new 
CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE;
-        resultSetTimeMS = isMetricsEnabled ? new 
CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE;
-        queryTimedOut = isMetricsEnabled ? new 
CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE;
-        queryFailed = isMetricsEnabled ? new 
CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE;
-        cacheRefreshedDueToSplits = isMetricsEnabled ? new 
CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER)
-                : NoOpRequestMetric.INSTANCE;
+    public OverAllQueryMetrics(LogLevel connectionLogLevel) {
+        queryWatch = new 
MetricsStopWatch(WALL_CLOCK_TIME_MS.isLoggingEnabled(connectionLogLevel));
+        resultSetWatch = new 
MetricsStopWatch(RESULT_SET_TIME_MS.isLoggingEnabled(connectionLogLevel));
+        numParallelScans = MetricUtil.getCombinableMetric(connectionLogLevel, 
NUM_PARALLEL_SCANS);
+        wallClockTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, 
WALL_CLOCK_TIME_MS);
+        resultSetTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, 
RESULT_SET_TIME_MS);
+        queryTimedOut = MetricUtil.getCombinableMetric(connectionLogLevel, 
QUERY_TIMEOUT_COUNTER);
+        queryFailed = MetricUtil.getCombinableMetric(connectionLogLevel, 
QUERY_FAILED_COUNTER);
+        cacheRefreshedDueToSplits = 
MetricUtil.getCombinableMetric(connectionLogLevel, 
CACHE_REFRESH_SPLITS_COUNTER);
     }
 
     public void updateNumParallelScans(long numParallelScans) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index 5c4238e..619776a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.annotation.Nonnull;
 
+import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -36,19 +37,23 @@ public class ReadMetricQueue {
 
     private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap 
= new ConcurrentHashMap<>();
 
-    private final boolean isRequestMetricsEnabled;
 
-    public ReadMetricQueue(boolean isRequestMetricsEnabled) {
-        this.isRequestMetricsEnabled = isRequestMetricsEnabled;
+    private LogLevel connectionLogLevel;
+
+    public ReadMetricQueue(LogLevel connectionLogLevel) {
+        this.connectionLogLevel = connectionLogLevel;
     }
 
     public CombinableMetric allotMetric(MetricType type, String tableName) {
-        if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
-        MetricKey key = new MetricKey(type, tableName);
-        Queue<CombinableMetric> q = getMetricQueue(key);
-        CombinableMetric metric = getMetric(type);
-        q.offer(metric);
-        return metric;
+        if (type.isLoggingEnabled(connectionLogLevel)) {
+            MetricKey key = new MetricKey(type, tableName);
+            Queue<CombinableMetric> q = getMetricQueue(key);
+            CombinableMetric metric = getMetric(type);
+            q.offer(metric);
+            return metric;
+        } else {
+            return NoOpRequestMetric.INSTANCE;
+        }
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
index 4373887..699982f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.monitoring;
 
+import org.apache.phoenix.log.LogLevel;
 
 /**
  * Class that encapsulates the various metrics associated with the spooling 
done by phoenix as part of servicing a
@@ -26,7 +27,7 @@ public class SpoolingMetricsHolder {
 
     private final CombinableMetric spoolFileSizeMetric;
     private final CombinableMetric numSpoolFileMetric;
-    public static final SpoolingMetricsHolder NO_OP_INSTANCE = new 
SpoolingMetricsHolder(new ReadMetricQueue(false), "");
+    public static final SpoolingMetricsHolder NO_OP_INSTANCE = new 
SpoolingMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
 
     public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String 
tableName) {
         this.spoolFileSizeMetric = 
readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
index 98ff57c..6117b40 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java
@@ -23,6 +23,8 @@ import static 
org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER;
 
+import org.apache.phoenix.log.LogLevel;
+
 
 /**
  * Class to encapsulate the various metrics associated with submitting and 
executing a task to the phoenix client
@@ -35,7 +37,7 @@ public class TaskExecutionMetricsHolder {
     private final CombinableMetric taskExecutionTime;
     private final CombinableMetric numTasks;
     private final CombinableMetric numRejectedTasks;
-    public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new 
TaskExecutionMetricsHolder(new ReadMetricQueue(false), "");
+    public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new 
TaskExecutionMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
     
     public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String 
tableName) {
         taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, 
tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 26e22a5..3bc2f48 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2466,7 +2466,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
     // Available for testing
     protected String getLogTableDDL() {
-        return QueryConstants.CREATE_LOG_METADATA;
+        return setSystemLogDDLProperties(QueryConstants.CREATE_LOG_METADATA);
+    }
+
+    private String setSystemLogDDLProperties(String ddl) {
+        return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, 
QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
+
     }
     
     private String setSystemDDLProperties(String ddl) {

Reply via email to