PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7afaceb7 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7afaceb7 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7afaceb7 Branch: refs/heads/master Commit: 7afaceb7e7355e59ae9465a02b812b230fc58edd Parents: 1966edb Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Mon May 14 13:33:22 2018 -0700 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Mon May 14 13:33:22 2018 -0700 ---------------------------------------------------------------------- bin/hbase-site.xml | 4 + .../apache/phoenix/end2end/QueryLoggerIT.java | 76 ++++++---- .../phoenix/monitoring/PhoenixMetricsIT.java | 28 ++-- .../phoenix/compile/StatementContext.java | 4 +- .../apache/phoenix/execute/MutationState.java | 4 +- .../phoenix/iterate/ChunkedResultIterator.java | 2 +- .../phoenix/iterate/ParallelIterators.java | 3 +- .../apache/phoenix/iterate/SerialIterators.java | 3 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 10 +- .../apache/phoenix/jdbc/PhoenixResultSet.java | 54 ++++--- .../apache/phoenix/jdbc/PhoenixStatement.java | 21 ++- .../java/org/apache/phoenix/log/LogLevel.java | 2 +- .../java/org/apache/phoenix/log/LogWriter.java | 6 +- .../org/apache/phoenix/log/QueryLogInfo.java | 38 +++-- .../org/apache/phoenix/log/QueryLogState.java | 22 --- .../org/apache/phoenix/log/QueryLogger.java | 74 +++++++--- .../org/apache/phoenix/log/QueryLoggerUtil.java | 62 ++++---- .../org/apache/phoenix/log/QueryStatus.java | 22 +++ .../org/apache/phoenix/log/RingBufferEvent.java | 38 +++-- .../phoenix/log/RingBufferEventTranslator.java | 21 ++- .../org/apache/phoenix/log/TableLogWriter.java | 144 +++++++++++-------- .../phoenix/mapreduce/PhoenixRecordReader.java | 13 +- .../phoenix/monitoring/MemoryMetricsHolder.java | 1 - .../apache/phoenix/monitoring/MetricType.java | 123 ++++++++++------ .../apache/phoenix/monitoring/MetricUtil.java | 30 ++++ .../phoenix/monitoring/MutationMetricQueue.java | 18 ++- .../phoenix/monitoring/OverAllQueryMetrics.java | 21 ++- .../phoenix/monitoring/ReadMetricQueue.java | 27 ++-- .../phoenix/monitoring/ScanMetricsHolder.java | 9 +- .../monitoring/SpoolingMetricsHolder.java | 3 +- .../monitoring/TaskExecutionMetricsHolder.java | 4 +- .../query/ConnectionQueryServicesImpl.java | 7 +- .../query/ConnectionlessQueryServicesImpl.java | 7 +- .../apache/phoenix/query/QueryConstants.java | 124 ++-------------- .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 1 + .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../java/org/apache/phoenix/util/QueryUtil.java | 7 +- .../org/apache/phoenix/util/SchemaUtil.java | 4 + .../iterate/SpoolingResultIteratorTest.java | 7 +- .../hive/mapreduce/PhoenixRecordReader.java | 3 +- 41 files changed, 570 insertions(+), 480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/bin/hbase-site.xml ---------------------------------------------------------------------- diff --git a/bin/hbase-site.xml b/bin/hbase-site.xml index 0ab9fd8..2f360e2 100644 --- a/bin/hbase-site.xml +++ b/bin/hbase-site.xml @@ -24,4 +24,8 @@ <name>hbase.regionserver.wal.codec</name> <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value> </property> + <property> + <name>phoenix.log.level</name> + <value>DEBUG</value> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java index 940ba6f..618d7d9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java @@ -31,7 +31,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -53,8 +52,10 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.log.LogLevel; -import org.apache.phoenix.log.QueryLogState; +import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdge; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; @@ -77,6 +78,19 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { DriverManager.registerDriver(PhoenixDriver.INSTANCE); } + private static class MyClock extends EnvironmentEdge { + public volatile long time; + + public MyClock (long time) { + this.time = time; + } + + @Override + public long currentTime() { + return time; + } + } + @Test public void testDebugLogs() throws Exception { @@ -97,12 +111,13 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { ResultSet explainRS = conn.createStatement().executeQuery("Explain " + query); String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; + while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -113,11 +128,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10); assertEquals(rs.getString(QUERY), query); - assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString()); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString()); assertEquals(rs.getString(TENANT_ID), null); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); - assertTrue(rs.getString(SCAN_METRICS_JSON).contains("scanMetrics")); + assertTrue(rs.getString(SCAN_METRICS_JSON)==null); assertEquals(rs.getString(EXCEPTION_TRACE),null); }else{ //confirm we are not logging system queries @@ -140,7 +153,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { String query = "SELECT * FROM " + tableName; int count=100; for (int i = 0; i < count; i++) { - conn.createStatement().executeQuery(query); + ResultSet rs = conn.createStatement().executeQuery(query); + while(rs.next()){ + + } } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; @@ -178,12 +194,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -191,12 +207,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); assertEquals(rs.getString(EXPLAIN_PLAN), null); assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null); - assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0); + assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10); assertEquals(rs.getString(QUERY), query); - assertEquals(rs.getString(QUERY_STATUS),null); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(QUERY_STATUS),QueryStatus.COMPLETED.toString()); assertEquals(rs.getString(TENANT_ID), null); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null); } } assertTrue(foundQueryLog); @@ -222,12 +236,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -255,7 +269,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { props.setProperty(QueryServices.LOG_LEVEL, loglevel.name()); Connection conn = DriverManager.getConnection(getUrl(),props); assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel); - + final MyClock clock = new MyClock(100); + EnvironmentEdgeManager.injectEdge(clock); + try{ String query = "SELECT * FROM " + tableName +" where V = ?"; PreparedStatement pstmt = conn.prepareStatement(query); @@ -270,12 +286,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { ResultSet explainRS = conn.createStatement() .executeQuery("Explain " + "SELECT * FROM " + tableName + " where V = 'value5'"); String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; - + // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -286,14 +302,18 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1); assertEquals(rs.getString(QUERY), query); - assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString()); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString()); + assertTrue(LogLevel.TRACE == loglevel ? rs.getString(SCAN_METRICS_JSON).contains("scanMetrics") + : rs.getString(SCAN_METRICS_JSON) == null); + assertEquals(rs.getTimestamp(START_TIME).getTime(),100); assertEquals(rs.getString(TENANT_ID), null); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); } } assertTrue(foundQueryLog); conn.close(); + }finally{ + EnvironmentEdgeManager.injectEdge(null); + } } @@ -315,14 +335,14 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(e.getErrorCode(), SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()); } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - ResultSet rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + ResultSet rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { - if (QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) { + if (QueryStatus.FAILED.name().equals(rs.getString(QUERY_STATUS))) { foundQueryLog = true; assertEquals(rs.getString(USER), System.getProperty("user.name")); assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); @@ -331,8 +351,6 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0); assertEquals(rs.getString(QUERY), query); assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage())); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); } } assertTrue(foundQueryLog); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index e45ddcd..73cdf0a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -10,6 +10,10 @@ package org.apache.phoenix.monitoring; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE; @@ -29,19 +33,6 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME; - -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_NOT_SERVING_REGION_EXCEPTION; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_RETRIES; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_FILTERED; - import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; @@ -73,11 +64,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.LoggingPhoenixConnection; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixMetricsLog; import org.apache.phoenix.jdbc.PhoenixResultSet; -import org.apache.phoenix.jdbc.LoggingPhoenixConnection; +import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; @@ -109,6 +101,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); // disable renewing leases as this will force spooling to happen. props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); + props.put(QueryServices.LOG_LEVEL, LogLevel.DEBUG.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); // need the non-test driver for some tests that check number of hconnections, etc. DriverManager.registerDriver(PhoenixDriver.INSTANCE); @@ -377,6 +370,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { insertRowsInTable(tableName, numRows); Properties props = new Properties(); props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false"); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name()); Connection conn = DriverManager.getConnection(getUrl(), props); ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); while (rs.next()) {} @@ -706,7 +700,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { private void changeInternalStateForTesting(PhoenixResultSet rs) { // get and set the internal state for testing purposes. - ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true); + ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.DEBUG); StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context"); Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue); Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue); @@ -772,8 +766,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { private class TestReadMetricsQueue extends ReadMetricQueue { - public TestReadMetricsQueue(boolean isRequestMetricsEnabled) { - super(isRequestMetricsEnabled); + public TestReadMetricsQueue(LogLevel connectionLogLevel) { + super(connectionLogLevel); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 3ea5dd5..4358ee3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -135,8 +135,8 @@ public class StatementContext { this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps .<PColumn, Integer> newLinkedHashMap(); this.subqueryResults = Maps.<SelectStatement, Object> newHashMap(); - this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled); - this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled); + this.readMetricsQueue = new ReadMetricQueue(connection.getLogLevel()); + this.overAllQueryMetrics = new OverAllQueryMetrics(connection.getLogLevel()); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 18f4fea..f3a383e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -39,8 +39,8 @@ import javax.annotation.concurrent.Immutable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -1181,7 +1181,7 @@ public class MutationState implements SQLCloseable { numFailedMutations = uncommittedStatementIndexes.length; GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations); } finally { - MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations); + MutationMetric mutationsMetric = new MutationMetric(connection.getLogLevel(),numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations); mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); try { if (cache!=null) http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index 8595fd4..acb6c04 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -147,7 +147,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { String tableName = tableRef.getTable().getPhysicalName().getString(); ReadMetricQueue readMetrics = context.getReadMetricsQueue(); ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, - readMetrics.isRequestMetricsEnabled()); + context.getConnection().getLogLevel()); long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(new TableResultIterator(mutationState, scan, http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 41d278d..262ae44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -107,11 +107,10 @@ public class ParallelIterators extends BaseResultIterators { context.getOverallQueryMetrics().updateNumParallelScans(numScans); GLOBAL_NUM_PARALLEL_SCANS.update(numScans); final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); - boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled(); for (final ScanLocator scanLocation : scanLocations) { final Scan scan = scanLocation.getScan(); final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, physicalTableName, - scan, isRequestMetricsEnabled); + scan, context.getConnection().getLogLevel()); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator( http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index c13fcdb..1693421 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -171,7 +171,6 @@ public class SerialIterators extends BaseResultIterators { return EMPTY_ITERATOR; } ReadMetricQueue readMetrics = context.getReadMetricsQueue(); - boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled(); while (index < scans.size()) { Scan currentScan = scans.get(index++); if (remainingOffset != null) { @@ -179,7 +178,7 @@ public class SerialIterators extends BaseResultIterators { } ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, currentScan, - isRequestMetricsEnabled); + context.getConnection().getLogLevel()); TableResultIterator itr = new TableResultIterator(mutationState, currentScan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index d3626f8..312d17b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -364,9 +364,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea function.getTenantId()))); } }; - this.isRequestLevelMetricsEnabled = JDBCUtil - .isCollectingRequestLevelMetricsEnabled(url, info, - this.services.getProps()); + this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL, + QueryServicesOptions.DEFAULT_LOGGING_LEVEL)); + this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, + this.services.getProps()); this.mutationState = mutationState == null ? newMutationState(maxSize, maxSizeBytes) : new MutationState(mutationState); this.metaData = metaData; @@ -380,8 +381,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.scannerQueue = new LinkedBlockingQueue<>(); this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory(); this.isRunningUpgrade = isRunningUpgrade; - this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL, - QueryServicesOptions.DEFAULT_LOGGING_LEVEL)); + this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE, QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE)); GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 153fa08..84816a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -51,8 +51,8 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.log.QueryLogInfo; -import org.apache.phoenix.log.QueryLogState; import org.apache.phoenix.log.QueryLogger; +import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.OverAllQueryMetrics; import org.apache.phoenix.monitoring.ReadMetricQueue; @@ -76,8 +76,6 @@ import org.apache.phoenix.util.SQLCloseable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; @@ -133,9 +131,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { private Long count = 0L; - private QueryLogState logStatus = QueryLogState.COMPLETED; + private Object exception; + - private RuntimeException exception; public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException { this.rowProjector = rowProjector; @@ -144,7 +142,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { this.statement = context.getStatement(); this.readMetricsQueue = context.getReadMetricsQueue(); this.overAllQueryMetrics = context.getOverallQueryMetrics(); - this.queryLogger = context.getQueryLogger(); + this.queryLogger = context.getQueryLogger() != null ? context.getQueryLogger() : QueryLogger.NO_OP_INSTANCE; } @Override @@ -181,6 +179,19 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { statement.getResultSets().remove(this); overAllQueryMetrics.endQuery(); overAllQueryMetrics.stopResultSetWatch(); + if (!queryLogger.isSynced()) { + if(this.exception==null){ + queryLogger.log(QueryLogInfo.QUERY_STATUS_I,QueryStatus.COMPLETED.toString()); + } + queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count); + if (queryLogger.isDebugEnabled()) { + queryLogger.log(QueryLogInfo.SCAN_METRICS_JSON_I, + readMetricsQueue.getScanMetricsHolderList().toString()); + readMetricsQueue.getScanMetricsHolderList().clear(); + } + // if not already synced , like closing before result set exhausted + queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics()); + } } } @@ -799,36 +810,33 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { } rowProjector.reset(); } catch (RuntimeException e) { - this.logStatus=QueryLogState.FAILED; // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. + queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString()); + if (queryLogger.isDebugEnabled()) { + queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); + } this.exception = e; if (e.getCause() instanceof SQLException) { throw (SQLException) e.getCause(); } throw e; }finally{ - if (currentRow == null && queryLogger != null ) { + if (this.exception!=null) { + queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count); if (queryLogger.isDebugEnabled()) { - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count); - queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I, - System.currentTimeMillis() - queryLogger.getStartTime()); - queryLogBuilder.put(QueryLogInfo.SCAN_METRICS_JSON_I, + queryLogger.log(QueryLogInfo.SCAN_METRICS_JSON_I, readMetricsQueue.getScanMetricsHolderList().toString()); - if (this.exception != null) { - queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, - Throwables.getStackTraceAsString(this.exception)); - } readMetricsQueue.getScanMetricsHolderList().clear(); - queryLogger.log(logStatus, queryLogBuilder.build()); + } + if (queryLogger != null) { + queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics()); } } - } - if (currentRow == null) { - - overAllQueryMetrics.endQuery(); - overAllQueryMetrics.stopResultSetWatch(); + if (currentRow == null) { + overAllQueryMetrics.endQuery(); + overAllQueryMetrics.stopResultSetWatch(); + } } return currentRow != null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 25b9fb0..015f04c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -93,7 +93,7 @@ import org.apache.phoenix.iterate.MaterializedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.log.QueryLogInfo; -import org.apache.phoenix.log.QueryLogState; +import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.log.QueryLogger; import org.apache.phoenix.log.QueryLoggerUtil; import org.apache.phoenix.optimize.Cost; @@ -190,8 +190,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.math.IntMath; @@ -319,10 +317,8 @@ public class PhoenixStatement implements Statement, SQLCloseable { StatementContext context = plan.getContext(); context.setQueryLogger(queryLogger); if(queryLogger.isDebugEnabled()){ - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator)); - queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null); - queryLogger.log(QueryLogState.COMPILED, queryLogBuilder.build()); + queryLogger.log(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator)); + queryLogger.log(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null); } context.getOverallQueryMetrics().startQuery(); PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext()); @@ -351,6 +347,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } throw e; }catch (RuntimeException e) { + // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. if (e.getCause() instanceof SQLException) { @@ -367,11 +364,9 @@ public class PhoenixStatement implements Statement, SQLCloseable { }, PhoenixContextExecutor.inContext()); }catch (Exception e) { if (queryLogger.isDebugEnabled()) { - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I, - System.currentTimeMillis() - queryLogger.getStartTime()); - queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); - queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build()); + queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); + queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString()); + queryLogger.sync(null, null); } Throwables.propagateIfInstanceOf(e, SQLException.class); Throwables.propagate(e); @@ -1781,7 +1776,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } QueryLogger queryLogger = QueryLogger.getInstance(connection,isSystemTable); QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(), - connection.getQueryServices(), sql, queryLogger.getStartTime(), getParameters()); + connection.getQueryServices(), sql, getParameters()); return queryLogger; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java index 5792658..269b4f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java @@ -18,5 +18,5 @@ package org.apache.phoenix.log; public enum LogLevel { - OFF, INFO, DEBUG, TRACE + OFF,INFO, DEBUG, TRACE } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java index 817f9ec..dab03e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java @@ -31,16 +31,18 @@ public interface LogWriter { * @param event * @throws SQLException * @throws IOException + * @throws ClassNotFoundException */ - void write(RingBufferEvent event) throws SQLException, IOException; + void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException; /** * will be called when disruptor is getting shutdown * * @throws IOException + * @throws SQLException */ - void close() throws IOException; + void close() throws IOException, SQLException; /** * if writer is closed and cannot write further event http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java index 87de267..fb38ba2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java @@ -28,8 +28,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; import org.apache.phoenix.schema.types.PDataType; @@ -40,29 +40,27 @@ import org.apache.phoenix.schema.types.PVarchar; public enum QueryLogInfo { - CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, PVarchar.INSTANCE), - QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, LogLevel.TRACE,PVarchar.INSTANCE), - QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - START_TIME_I(START_TIME,QueryLogState.STARTED, LogLevel.INFO,PTimestamp.INSTANCE), - USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE), - GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE), - NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE), - EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE), - QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE), - TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE), - SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE); + CLIENT_IP_I(CLIENT_IP, LogLevel.INFO, PVarchar.INSTANCE), + QUERY_I(QUERY, LogLevel.INFO,PVarchar.INSTANCE), + BIND_PARAMETERS_I(BIND_PARAMETERS, LogLevel.TRACE,PVarchar.INSTANCE), + QUERY_ID_I(QUERY_ID, LogLevel.INFO,PVarchar.INSTANCE), + TENANT_ID_I(TENANT_ID, LogLevel.INFO,PVarchar.INSTANCE), + START_TIME_I(START_TIME, LogLevel.INFO,PTimestamp.INSTANCE), + USER_I(USER, LogLevel.INFO,PVarchar.INSTANCE), + EXPLAIN_PLAN_I(EXPLAIN_PLAN,LogLevel.DEBUG,PVarchar.INSTANCE), + GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS, LogLevel.DEBUG,PVarchar.INSTANCE), + NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED, LogLevel.INFO,PLong.INSTANCE), + EXCEPTION_TRACE_I(EXCEPTION_TRACE, LogLevel.DEBUG,PVarchar.INSTANCE), + QUERY_STATUS_I(QUERY_STATUS, LogLevel.INFO,PVarchar.INSTANCE), + SCAN_METRICS_JSON_I(SCAN_METRICS_JSON, LogLevel.TRACE,PVarchar.INSTANCE), + TABLE_NAME_I(TABLE_NAME, LogLevel.DEBUG,PVarchar.INSTANCE); public final String columnName; - public final QueryLogState logState; public final LogLevel logLevel; public final PDataType dataType; - private QueryLogInfo(String columnName, QueryLogState logState, LogLevel logLevel, PDataType dataType) { + private QueryLogInfo(String columnName, LogLevel logLevel, PDataType dataType) { this.columnName = columnName; - this.logState=logState; this.logLevel=logLevel; this.dataType=dataType; } @@ -71,10 +69,6 @@ public enum QueryLogInfo { return columnName; } - public QueryLogState getLogState() { - return logState; - } - public LogLevel getLogLevel() { return logLevel; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java deleted file mode 100644 index e27f0e8..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.log; - -public enum QueryLogState { - STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java index b2fb235..ef5559c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java @@ -17,13 +17,17 @@ */ package org.apache.phoenix.log; +import java.util.Map; import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.util.EnvironmentEdgeManager; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import io.netty.util.internal.ThreadLocalRandom; @@ -34,15 +38,17 @@ public class QueryLogger { private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator = new ThreadLocal<>(); private QueryLoggerDisruptor queryDisruptor; private String queryId; - private Long startTime; private LogLevel logLevel; + private Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); + private boolean isSynced; private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class); private QueryLogger(PhoenixConnection connection) { this.queryId = UUID.randomUUID().toString(); this.queryDisruptor = connection.getQueryServices().getQueryDisruptor(); - this.startTime = System.currentTimeMillis(); logLevel = connection.getLogLevel(); + log(QueryLogInfo.QUERY_ID_I, queryId); + log(QueryLogInfo.START_TIME_I, EnvironmentEdgeManager.currentTimeMillis()); } private QueryLogger() { @@ -58,21 +64,32 @@ public class QueryLogger { return result; } - private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() { + public static final QueryLogger NO_OP_INSTANCE = new QueryLogger() { @Override - public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) { + public void log(QueryLogInfo queryLogInfo, Object info) { } - + @Override - public boolean isDebugEnabled(){ + public boolean isDebugEnabled() { return false; } - + @Override - public boolean isInfoEnabled(){ + public boolean isInfoEnabled() { return false; } + + @Override + public void sync( + Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + + } + + @Override + public boolean isSynced(){ + return true; + } }; public static QueryLogger getInstance(PhoenixConnection connection, boolean isSystemTable) { @@ -82,14 +99,14 @@ public class QueryLogger { } /** - * Add query log in the table, columns will be logged depending upon the connection logLevel - * @param logState State of the query - * @param map Value of the map should be in format of the corresponding data type + * Add query log in the table, columns will be logged depending upon the connection logLevel */ - public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) { - final RingBufferEventTranslator translator = getCachedTranslator(); - translator.setQueryInfo(logState, map, logLevel); - publishLogs(translator); + public void log(QueryLogInfo queryLogInfo, Object info) { + try { + queryLogBuilder.put(queryLogInfo, info); + } catch (Exception e) { + LOG.warn("Unable to add log info because of " + e.getMessage()); + } } private boolean publishLogs(RingBufferEventTranslator translator) { @@ -102,13 +119,6 @@ public class QueryLogger { } /** - * Start time when the logger was started, if {@link LogLevel#OFF} then it's the current time - */ - public Long getStartTime() { - return startTime != null ? startTime : System.currentTimeMillis(); - } - - /** * Is debug logging currently enabled? * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than debug. */ @@ -117,7 +127,8 @@ public class QueryLogger { } private boolean isLevelEnabled(LogLevel logLevel){ - return this.logLevel != null ? logLevel.ordinal() <= this.logLevel.ordinal() : false; + return this.logLevel != null && logLevel != LogLevel.OFF ? logLevel.ordinal() <= this.logLevel.ordinal() + : false; } /** @@ -142,4 +153,21 @@ public class QueryLogger { return this.queryId; } + + public void sync(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + if (!isSynced) { + isSynced = true; + final RingBufferEventTranslator translator = getCachedTranslator(); + translator.setQueryInfo(logLevel, queryLogBuilder.build(), readMetrics, overAllMetrics); + publishLogs(translator); + } + } + + /** + * Is Synced already + */ + public boolean isSynced(){ + return this.isSynced; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java index d5c4878..21917ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java @@ -25,48 +25,36 @@ import org.apache.commons.lang.StringUtils; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PName; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; - public class QueryLoggerUtil { + public static void logInitialDetails(QueryLogger queryLogger, PName tenantId, ConnectionQueryServices queryServices, - String query, long startTime, List<Object> bindParameters) { + String query, List<Object> bindParameters) { try { - queryLogger.log(QueryLogState.STARTED, - getInitialDetails(tenantId, queryServices, query, startTime, bindParameters)); + String clientIP; + try { + clientIP = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + clientIP = "UnknownHost"; + } + + if (clientIP != null) { + queryLogger.log(QueryLogInfo.CLIENT_IP_I, clientIP); + } + if (query != null) { + queryLogger.log(QueryLogInfo.QUERY_I, query); + } + if (bindParameters != null) { + queryLogger.log(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ",")); + } + if (tenantId != null) { + queryLogger.log(QueryLogInfo.TENANT_ID_I, tenantId.getString()); + } + + queryLogger.log(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName() + : queryServices.getUser().getShortName()); } catch (Exception e) { - // Ignore for now - } - - } - - private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName tenantId, - ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) { - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - String clientIP; - try { - clientIP = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - clientIP = "UnknownHost"; + // Ignore } - - if (clientIP != null) { - queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP); - } - if (query != null) { - queryLogBuilder.put(QueryLogInfo.QUERY_I, query); - } - queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime); - if (bindParameters != null) { - queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ",")); - } - if (tenantId != null) { - queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, tenantId.getString()); - } - - queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName() - : queryServices.getUser().getShortName()); - return queryLogBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java new file mode 100644 index 0000000..0e634c1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +public enum QueryStatus { + COMPILED, COMPLETED,FAILED +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java index 96e4bf9..8854e68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java @@ -17,14 +17,19 @@ */ package org.apache.phoenix.log; +import java.util.Map; + +import org.apache.phoenix.monitoring.MetricType; + import com.google.common.collect.ImmutableMap; import com.lmax.disruptor.EventFactory; class RingBufferEvent { private String queryId; - private QueryLogState logState; private LogLevel connectionLogLevel; private ImmutableMap<QueryLogInfo, Object> queryInfo; + private Map<String, Map<MetricType, Long>> readMetrics; + private Map<MetricType, Long> overAllMetrics; public static final Factory FACTORY = new Factory(); @@ -40,7 +45,6 @@ import com.lmax.disruptor.EventFactory; } public void clear() { - this.logState=null; this.queryInfo=null; this.queryId=null; } @@ -53,10 +57,6 @@ import com.lmax.disruptor.EventFactory; public static Factory getFactory() { return FACTORY; } - - public QueryLogState getLogState() { - return logState; - } public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) { this.queryInfo=queryInfo; @@ -73,12 +73,6 @@ import com.lmax.disruptor.EventFactory; } - public void setLogState(QueryLogState logState) { - this.logState=logState; - - } - - public LogLevel getConnectionLogLevel() { return connectionLogLevel; } @@ -88,6 +82,26 @@ import com.lmax.disruptor.EventFactory; this.connectionLogLevel = connectionLogLevel; } + + public Map<String, Map<MetricType, Long>> getReadMetrics() { + return readMetrics; + } + + + public void setReadMetrics(Map<String, Map<MetricType, Long>> readMetrics) { + this.readMetrics = readMetrics; + } + + + public Map<MetricType, Long> getOverAllMetrics() { + return overAllMetrics; + } + + + public void setOverAllMetrics(Map<MetricType, Long> overAllMetrics) { + this.overAllMetrics = overAllMetrics; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java index 653ddd6..742f8e1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java @@ -17,14 +17,19 @@ */ package org.apache.phoenix.log; +import java.util.Map; + +import org.apache.phoenix.monitoring.MetricType; + import com.google.common.collect.ImmutableMap; import com.lmax.disruptor.EventTranslator; class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> { private String queryId; - private QueryLogState logState; private ImmutableMap<QueryLogInfo, Object> queryInfo; private LogLevel connectionLogLevel; + private Map<String, Map<MetricType, Long>> readMetrics; + private Map<MetricType, Long> overAllMetrics; public RingBufferEventTranslator(String queryId) { this.queryId=queryId; @@ -34,20 +39,22 @@ class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> { public void translateTo(RingBufferEvent event, long sequence) { event.setQueryId(queryId); event.setQueryInfo(queryInfo); - event.setLogState(logState); + event.setReadMetrics(readMetrics); + event.setOverAllMetrics(overAllMetrics); event.setConnectionLogLevel(connectionLogLevel); clear(); } private void clear() { - setQueryInfo(null,null,null); + setQueryInfo(null,null,null,null); } - public void setQueryInfo(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> queryInfo, - LogLevel connectionLogLevel) { + public void setQueryInfo(LogLevel logLevel, ImmutableMap<QueryLogInfo, Object> queryInfo, Map<String, Map<MetricType, Long>> readMetrics, + Map<MetricType, Long> overAllMetrics) { this.queryInfo = queryInfo; - this.logState = logState; - this.connectionLogLevel = connectionLogLevel; + this.connectionLogLevel = logLevel; + this.readMetrics = readMetrics; + this.overAllMetrics=overAllMetrics; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java index c102855..0209951 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java @@ -21,23 +21,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCH import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Map.Entry; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.expression.Determinism; -import org.apache.phoenix.expression.LiteralExpression; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.util.ByteUtil; -import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.util.QueryUtil; import com.google.common.collect.ImmutableMap; @@ -49,75 +43,111 @@ public class TableLogWriter implements LogWriter { private static final Log LOG = LogFactory.getLog(LogWriter.class); private Connection connection; private boolean isClosed; - private Table table; + private PreparedStatement upsertStatement; private Configuration config; + private Map<MetricType,Integer> metricOrdinals=new HashMap<MetricType,Integer>(); public TableLogWriter(Configuration configuration) { - this.config = configuration; - try { - this.connection = ConnectionFactory.createConnection(configuration); - table = this.connection.getTable(SchemaUtil.getPhysicalTableName( - SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config)); - } catch (Exception e) { - LOG.warn("Unable to initiate LogWriter for writing query logs to table"); + this.config=configuration; + } + + private PreparedStatement buildUpsertStatement(Connection conn) throws SQLException { + StringBuilder buf = new StringBuilder("UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"("); + int queryLogEntries=0; + for (QueryLogInfo info : QueryLogInfo.values()) { + buf.append(info.columnName); + buf.append(','); + queryLogEntries++; + } + for (MetricType metric : MetricType.values()) { + if (metric.logLevel() != LogLevel.OFF) { + metricOrdinals.put(metric, ++queryLogEntries); + buf.append(metric.columnName()); + buf.append(','); + } + } + buf.setLength(buf.length()-1); + buf.append(") VALUES ("); + for (int i = 0; i < QueryLogInfo.values().length; i++) { + buf.append("?,"); } + for (MetricType metric : MetricType.values()) { + if (metric.logLevel() != LogLevel.OFF) { + buf.append("?,"); + } + } + buf.setLength(buf.length()-1); + buf.append(")"); + return conn.prepareStatement(buf.toString()); } @Override - public void write(RingBufferEvent event) throws SQLException, IOException { - if(isClosed()){ + public void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException { + if (isClosed()) { LOG.warn("Unable to commit query log as Log committer is already closed"); return; } - if (table == null || connection == null) { - LOG.warn("Unable to commit query log as connection was not initiated "); - return; + if (connection == null) { + synchronized (this) { + if (connection == null) { + connection = QueryUtil.getConnectionForQueryLog(this.config); + this.upsertStatement = buildUpsertStatement(connection); + } + } } - ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo(); - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - Put put =new Put(Bytes.toBytes(event.getQueryId())); - for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) { - if (entry.getKey().logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) { - LiteralExpression expression = LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType, - Determinism.ALWAYS); - expression.evaluate(null, ptr); - put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes(entry.getKey().columnName), - ByteUtil.copyKeyBytesIfNecessary(ptr)); + + ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo(); + for (QueryLogInfo info : QueryLogInfo.values()) { + if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) { + upsertStatement.setObject(info.ordinal() + 1, queryInfoMap.get(info)); + } else { + upsertStatement.setObject(info.ordinal() + 1, null); } } - - if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal() - && (event.getLogState() == QueryLogState.COMPLETED || event.getLogState() == QueryLogState.FAILED)) { - LiteralExpression expression = LiteralExpression.newConstant(event.getLogState().toString(), - QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS); - expression.evaluate(null, ptr); - put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), ByteUtil.copyKeyBytesIfNecessary(ptr)); + Map<MetricType, Long> overAllMetrics = event.getOverAllMetrics(); + Map<String, Map<MetricType, Long>> readMetrics = event.getReadMetrics(); + + for (MetricType metric : MetricType.values()) { + if (overAllMetrics != null && overAllMetrics.containsKey(metric) + && metric.isLoggingEnabled(event.getConnectionLogLevel())) { + upsertStatement.setObject(metricOrdinals.get(metric), overAllMetrics.get(metric)); + } else { + if (metric.logLevel() != LogLevel.OFF) { + upsertStatement.setObject(metricOrdinals.get(metric), null); + } + } + } + + if (readMetrics != null && !readMetrics.isEmpty()) { + for (Map.Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) { + upsertStatement.setObject(QueryLogInfo.TABLE_NAME_I.ordinal() + 1, entry.getKey()); + for (MetricType metric : entry.getValue().keySet()) { + if (metric.isLoggingEnabled(event.getConnectionLogLevel())) { + upsertStatement.setObject(metricOrdinals.get(metric), entry.getValue().get(metric)); + } + } + upsertStatement.executeUpdate(); + } + } else { + upsertStatement.executeUpdate(); } - put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); - table.put(put); - + connection.commit(); } @Override public void close() throws IOException { - if(isClosed()){ - return; - } - isClosed=true; + if (isClosed()) { return; } + isClosed = true; try { - if (table != null) { - table.close(); - } - if (connection != null && !connection.isClosed()) { - //It should internally close all the statements + if (connection != null) { + // It should internally close all the statements connection.close(); } - } catch (IOException e) { + } catch (SQLException e) { // TODO Ignore? } } - + public boolean isClosed(){ return isClosed; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index ec1b451..58c048b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -35,7 +35,15 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.iterate.*; +import org.apache.phoenix.iterate.ConcatResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.iterate.PeekingResultIterator; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.RoundRobinResultIterator; +import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.iterate.TableSnapshotResultIterator; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.monitoring.ReadMetricQueue; @@ -112,7 +120,6 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null services.clearTableRegionCache(tableNameBytes); long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); - boolean isRequestMetricsEnabled = readMetrics.isRequestMetricsEnabled(); for (Scan scan : scans) { // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true)); @@ -120,7 +127,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null PeekingResultIterator peekingResultIterator; ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan, - isRequestMetricsEnabled); + queryPlan.getContext().getConnection().getLogLevel()); if (snapshotName != null) { // result iterator to read snapshots final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan, http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java index 0e82ce4..daa0bba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java @@ -26,7 +26,6 @@ import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME; public class MemoryMetricsHolder { private final CombinableMetric memoryChunkSizeMetric; private final CombinableMetric memoryWaitTimeMetric; - public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null); public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) { this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index ef6eceb..8e1de66 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -17,62 +17,74 @@ */ package org.apache.phoenix.monitoring; +import org.apache.phoenix.log.LogLevel; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PLong; + + +/** + * Keeping {@link LogLevel#OFF} for metrics which are calculated globally only and doesn't need to be logged in SYSTEM.LOG + */ public enum MetricType { - NO_OP_METRIC("no", "No op metric"), + NO_OP_METRIC("no", "No op metric",LogLevel.OFF, PLong.INSTANCE), // mutation (write) related metrics - MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch"), - MUTATION_BYTES("mb", "Size of mutations in bytes"), - MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations"), - MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed"), - MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements"), + MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch",LogLevel.OFF, PLong.INSTANCE), + MUTATION_BYTES("mb", "Size of mutations in bytes",LogLevel.OFF, PLong.INSTANCE), + MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations",LogLevel.OFF, PLong.INSTANCE), + MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed",LogLevel.OFF, PLong.INSTANCE), + MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements",LogLevel.OFF, PLong.INSTANCE), // query (read) related metrics - QUERY_TIME("qt", "Query times"), - QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out"), - QUERY_FAILED_COUNTER("qf", "Number of times query failed"), - NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel"), - SCAN_BYTES("sb", "Number of bytes read by scans"), - SELECT_SQL_COUNTER("sc", "Counter for number of sql queries"), + QUERY_TIME("qt", "Query times",LogLevel.OFF, PLong.INSTANCE), + QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out",LogLevel.DEBUG, PLong.INSTANCE), + QUERY_FAILED_COUNTER("qf", "Number of times query failed",LogLevel.DEBUG, PLong.INSTANCE), + NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel",LogLevel.DEBUG, PLong.INSTANCE), + SCAN_BYTES("sb", "Number of bytes read by scans",LogLevel.OFF, PLong.INSTANCE), + SELECT_SQL_COUNTER("sc", "Counter for number of sql queries",LogLevel.OFF, PLong.INSTANCE), // task metrics - TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor"), - TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion"), - TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute"), - TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor"), - TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor"), + TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE), + TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion",LogLevel.DEBUG, PLong.INSTANCE), + TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute",LogLevel.DEBUG, PLong.INSTANCE), + TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE), + TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE), // spool metrics - SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes"), - SPOOL_FILE_COUNTER("sn", "Number of spool files created"), + SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes",LogLevel.DEBUG, PLong.INSTANCE), + SPOOL_FILE_COUNTER("sn", "Number of spool files created",LogLevel.DEBUG, PLong.INSTANCE), // misc metrics - MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager"), - MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager"), - CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits"), - WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution"), - RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()"), - OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections"), - QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated"), - HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver"), + MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager",LogLevel.DEBUG, PLong.INSTANCE), + MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager",LogLevel.DEBUG, PLong.INSTANCE), + CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE), + WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE), + RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE), + OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE), + QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE), + HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE), PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " + - "because there are already too many to that target cluster."), - PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not."), + "because there are already too many to that target cluster.",LogLevel.OFF, PLong.INSTANCE), + PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE), // hbase metrics - COUNT_RPC_CALLS("rp", "Number of RPC calls"), - COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls"), - COUNT_MILLS_BETWEEN_NEXTS("n", "Sum of milliseconds between sequential next calls"), - COUNT_NOT_SERVING_REGION_EXCEPTION("nsr", "Number of NotServingRegionException caught"), - COUNT_BYTES_REGION_SERVER_RESULTS("rs", "Number of bytes in Result objects from region servers"), - COUNT_BYTES_IN_REMOTE_RESULTS("rrs", "Number of bytes in Result objects from remote region servers"), - COUNT_SCANNED_REGIONS("rg", "Number of regions scanned"), - COUNT_RPC_RETRIES("rpr", "Number of RPC retries"), - COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries"), - COUNT_ROWS_SCANNED("ws", "Number of rows scanned"), - COUNT_ROWS_FILTERED("wf", "Number of rows filtered"); + COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_MILLS_BETWEEN_NEXTS("n", "Sum of milliseconds between sequential next calls",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_NOT_SERVING_REGION_EXCEPTION("nsr", "Number of NotServingRegionException caught",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_BYTES_REGION_SERVER_RESULTS("rs", "Number of bytes in Result objects from region servers",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_BYTES_IN_REMOTE_RESULTS("rrs", "Number of bytes in Result objects from remote region servers",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_SCANNED_REGIONS("rg", "Number of regions scanned",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_RPC_RETRIES("rpr", "Number of RPC retries",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_ROWS_SCANNED("ws", "Number of rows scanned",LogLevel.DEBUG, PLong.INSTANCE), + COUNT_ROWS_FILTERED("wf", "Number of rows filtered",LogLevel.DEBUG,PLong.INSTANCE); private final String description; private final String shortName; + private LogLevel logLevel; + private PDataType dataType; - private MetricType(String shortName, String description) { + private MetricType(String shortName, String description, LogLevel logLevel, PDataType dataType) { this.shortName = shortName; this.description = description; + this.logLevel=logLevel; + this.dataType=dataType; } public String description() { @@ -82,5 +94,34 @@ public enum MetricType { public String shortName() { return shortName; } + + public LogLevel logLevel() { + return logLevel; + } + + public PDataType dataType() { + return dataType; + } + + public String columnName() { + return name(); + } + + public boolean isLoggingEnabled(LogLevel connectionLogLevel){ + return logLevel() != LogLevel.OFF && (logLevel().ordinal() <= connectionLogLevel.ordinal()); + } + public static String getMetricColumnsDetails() { + StringBuilder buffer=new StringBuilder(); + for(MetricType metric:MetricType.values()){ + if (metric.logLevel() != LogLevel.OFF) { + buffer.append(metric.columnName()); + buffer.append(" "); + buffer.append(metric.dataType.getSqlTypeName()); + buffer.append(","); + } + } + return buffer.toString(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7afaceb7/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java new file mode 100644 index 0000000..1e5ac08 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import org.apache.phoenix.log.LogLevel; +import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; + +public class MetricUtil { + + public static CombinableMetric getCombinableMetric(LogLevel connectionLogLevel, MetricType type) { + if (!type.isLoggingEnabled(connectionLogLevel)) { return NoOpRequestMetric.INSTANCE; } + return new CombinableMetricImpl(type); + } + +}