PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b5607eef Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b5607eef Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b5607eef Branch: refs/heads/4.x-HBase-0.98 Commit: b5607eeff5fc1648e4c98f3d01f29b82ae7be1bd Parents: 75e8f70 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Mon May 14 14:10:44 2018 -0700 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Mon May 14 14:10:44 2018 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/QueryLoggerIT.java | 74 ++++++---- .../phoenix/monitoring/PhoenixMetricsIT.java | 11 +- .../phoenix/compile/StatementContext.java | 4 +- .../apache/phoenix/execute/MutationState.java | 4 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 10 +- .../apache/phoenix/jdbc/PhoenixResultSet.java | 39 +++--- .../apache/phoenix/jdbc/PhoenixStatement.java | 21 ++- .../java/org/apache/phoenix/log/LogLevel.java | 2 +- .../java/org/apache/phoenix/log/LogWriter.java | 6 +- .../org/apache/phoenix/log/QueryLogInfo.java | 38 +++-- .../org/apache/phoenix/log/QueryLogState.java | 22 --- .../org/apache/phoenix/log/QueryLogger.java | 74 +++++++--- .../org/apache/phoenix/log/QueryLoggerUtil.java | 62 ++++----- .../org/apache/phoenix/log/QueryStatus.java | 22 +++ .../org/apache/phoenix/log/RingBufferEvent.java | 38 +++-- .../phoenix/log/RingBufferEventTranslator.java | 21 ++- .../org/apache/phoenix/log/TableLogWriter.java | 139 ++++++++++++------- .../phoenix/mapreduce/PhoenixRecordReader.java | 10 +- .../phoenix/monitoring/MemoryMetricsHolder.java | 1 - .../apache/phoenix/monitoring/MetricType.java | 112 ++++++++++----- .../apache/phoenix/monitoring/MetricUtil.java | 30 ++++ .../phoenix/monitoring/MutationMetricQueue.java | 18 ++- .../phoenix/monitoring/OverAllQueryMetrics.java | 21 ++- .../phoenix/monitoring/ReadMetricQueue.java | 23 +-- .../monitoring/SpoolingMetricsHolder.java | 3 +- .../monitoring/TaskExecutionMetricsHolder.java | 4 +- .../query/ConnectionQueryServicesImpl.java | 7 +- .../query/ConnectionlessQueryServicesImpl.java | 7 +- .../apache/phoenix/query/QueryConstants.java | 124 ++--------------- .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 1 + .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../java/org/apache/phoenix/util/QueryUtil.java | 7 +- .../org/apache/phoenix/util/SchemaUtil.java | 4 + .../iterate/SpoolingResultIteratorTest.java | 7 +- 35 files changed, 533 insertions(+), 436 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java index d6cc096..30d3222 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java @@ -26,11 +26,11 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITER import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -52,8 +52,10 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.log.LogLevel; -import org.apache.phoenix.log.QueryLogState; +import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdge; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; @@ -76,6 +78,19 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { DriverManager.registerDriver(PhoenixDriver.INSTANCE); } + private static class MyClock extends EnvironmentEdge { + public volatile long time; + + public MyClock (long time) { + this.time = time; + } + + @Override + public long currentTime() { + return time; + } + } + @Test public void testDebugLogs() throws Exception { @@ -96,12 +111,13 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { ResultSet explainRS = conn.createStatement().executeQuery("Explain " + query); String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; + while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -112,10 +128,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10); assertEquals(rs.getString(QUERY), query); - assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString()); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString()); assertEquals(rs.getString(TENANT_ID), null); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); + assertTrue(rs.getString(SCAN_METRICS_JSON)==null); assertEquals(rs.getString(EXCEPTION_TRACE),null); }else{ //confirm we are not logging system queries @@ -138,7 +153,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { String query = "SELECT * FROM " + tableName; int count=100; for (int i = 0; i < count; i++) { - conn.createStatement().executeQuery(query); + ResultSet rs = conn.createStatement().executeQuery(query); + while(rs.next()){ + + } } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; @@ -176,12 +194,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -189,12 +207,10 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); assertEquals(rs.getString(EXPLAIN_PLAN), null); assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null); - assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0); + assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10); assertEquals(rs.getString(QUERY), query); - assertEquals(rs.getString(QUERY_STATUS),null); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(QUERY_STATUS),QueryStatus.COMPLETED.toString()); assertEquals(rs.getString(TENANT_ID), null); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null); } } assertTrue(foundQueryLog); @@ -220,12 +236,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -253,7 +269,9 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { props.setProperty(QueryServices.LOG_LEVEL, loglevel.name()); Connection conn = DriverManager.getConnection(getUrl(),props); assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel); - + final MyClock clock = new MyClock(100); + EnvironmentEdgeManager.injectEdge(clock); + try{ String query = "SELECT * FROM " + tableName +" where V = ?"; PreparedStatement pstmt = conn.prepareStatement(query); @@ -268,12 +286,12 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { ResultSet explainRS = conn.createStatement() .executeQuery("Explain " + "SELECT * FROM " + tableName + " where V = 'value5'"); String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; - + // sleep for sometime to let query log committed Thread.sleep(delay); + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { if (rs.getString(QUERY_ID).equals(queryId)) { foundQueryLog = true; @@ -284,14 +302,16 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1); assertEquals(rs.getString(QUERY), query); - assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString()); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(QUERY_STATUS), QueryStatus.COMPLETED.toString()); + assertEquals(rs.getTimestamp(START_TIME).getTime(),100); assertEquals(rs.getString(TENANT_ID), null); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); } } assertTrue(foundQueryLog); conn.close(); + }finally{ + EnvironmentEdgeManager.injectEdge(null); + } } @@ -313,14 +333,14 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(e.getErrorCode(), SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()); } String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; - ResultSet rs = conn.createStatement().executeQuery(logQuery); - boolean foundQueryLog = false; int delay = 5000; // sleep for sometime to let query log committed Thread.sleep(delay); + ResultSet rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; while (rs.next()) { - if (QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) { + if (QueryStatus.FAILED.name().equals(rs.getString(QUERY_STATUS))) { foundQueryLog = true; assertEquals(rs.getString(USER), System.getProperty("user.name")); assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); @@ -329,8 +349,6 @@ public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0); assertEquals(rs.getString(QUERY), query); assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage())); - assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); - assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); } } assertTrue(foundQueryLog); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 81428e6..4c5de79 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -60,11 +60,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.LoggingPhoenixConnection; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixMetricsLog; import org.apache.phoenix.jdbc.PhoenixResultSet; -import org.apache.phoenix.jdbc.LoggingPhoenixConnection; +import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; @@ -94,6 +95,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); // disable renewing leases as this will force spooling to happen. props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); + props.put(QueryServices.LOG_LEVEL, LogLevel.DEBUG.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); // need the non-test driver for some tests that check number of hconnections, etc. DriverManager.registerDriver(PhoenixDriver.INSTANCE); @@ -352,6 +354,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { insertRowsInTable(tableName, numRows); Properties props = new Properties(); props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false"); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name()); Connection conn = DriverManager.getConnection(getUrl(), props); ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); while (rs.next()) {} @@ -681,7 +684,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { private void changeInternalStateForTesting(PhoenixResultSet rs) { // get and set the internal state for testing purposes. - ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true); + ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.DEBUG); StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context"); Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue); Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue); @@ -754,8 +757,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { private class TestReadMetricsQueue extends ReadMetricQueue { - public TestReadMetricsQueue(boolean isRequestMetricsEnabled) { - super(isRequestMetricsEnabled); + public TestReadMetricsQueue(LogLevel connectionLogLevel) { + super(connectionLogLevel); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 3ea5dd5..4358ee3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -135,8 +135,8 @@ public class StatementContext { this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps .<PColumn, Integer> newLinkedHashMap(); this.subqueryResults = Maps.<SelectStatement, Object> newHashMap(); - this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled); - this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled); + this.readMetricsQueue = new ReadMetricQueue(connection.getLogLevel()); + this.overAllQueryMetrics = new OverAllQueryMetrics(connection.getLogLevel()); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 0f995b3..1e75b75 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -39,8 +39,8 @@ import javax.annotation.concurrent.Immutable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -1182,7 +1182,7 @@ public class MutationState implements SQLCloseable { numFailedMutations = uncommittedStatementIndexes.length; GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations); } finally { - MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations); + MutationMetric mutationsMetric = new MutationMetric(connection.getLogLevel(),numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations); mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); try { if (cache!=null) http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index bb3d447..43ad427 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -356,9 +356,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea function.getTenantId()))); } }; - this.isRequestLevelMetricsEnabled = JDBCUtil - .isCollectingRequestLevelMetricsEnabled(url, info, - this.services.getProps()); + this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL, + QueryServicesOptions.DEFAULT_LOGGING_LEVEL)); + this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, + this.services.getProps()); this.mutationState = mutationState == null ? newMutationState(maxSize, maxSizeBytes) : new MutationState(mutationState); this.metaData = metaData; @@ -372,8 +373,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.scannerQueue = new LinkedBlockingQueue<>(); this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory(); this.isRunningUpgrade = isRunningUpgrade; - this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL, - QueryServicesOptions.DEFAULT_LOGGING_LEVEL)); + this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE, QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE)); GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 909b59d..1bc47db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -51,8 +51,8 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.log.QueryLogInfo; -import org.apache.phoenix.log.QueryLogState; import org.apache.phoenix.log.QueryLogger; +import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.OverAllQueryMetrics; import org.apache.phoenix.monitoring.ReadMetricQueue; @@ -76,8 +76,6 @@ import org.apache.phoenix.util.SQLCloseable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; @@ -133,9 +131,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { private Long count = 0L; - private QueryLogState logStatus = QueryLogState.COMPLETED; + private Object exception; + - private RuntimeException exception; public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException { this.rowProjector = rowProjector; @@ -144,7 +142,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { this.statement = context.getStatement(); this.readMetricsQueue = context.getReadMetricsQueue(); this.overAllQueryMetrics = context.getOverallQueryMetrics(); - this.queryLogger = context.getQueryLogger(); + this.queryLogger = context.getQueryLogger() != null ? context.getQueryLogger() : QueryLogger.NO_OP_INSTANCE; } @Override @@ -181,6 +179,14 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { statement.getResultSets().remove(this); overAllQueryMetrics.endQuery(); overAllQueryMetrics.stopResultSetWatch(); + if (!queryLogger.isSynced()) { + if(this.exception==null){ + queryLogger.log(QueryLogInfo.QUERY_STATUS_I,QueryStatus.COMPLETED.toString()); + } + queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count); + // if not already synced , like closing before result set exhausted + queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics()); + } } } @@ -799,27 +805,22 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { } rowProjector.reset(); } catch (RuntimeException e) { - this.logStatus=QueryLogState.FAILED; // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. + queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString()); + if (queryLogger.isDebugEnabled()) { + queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); + } this.exception = e; if (e.getCause() instanceof SQLException) { throw (SQLException) e.getCause(); } throw e; }finally{ - if (currentRow == null && queryLogger != null ) { - if (queryLogger.isDebugEnabled()) { - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count); - queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I, - System.currentTimeMillis() - queryLogger.getStartTime()); - - if (this.exception != null) { - queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, - Throwables.getStackTraceAsString(this.exception)); - } - queryLogger.log(logStatus, queryLogBuilder.build()); + if (this.exception != null) { + queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count); + if (queryLogger != null) { + queryLogger.sync(getReadMetrics(), getOverAllRequestReadMetrics()); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 29edb7f..5d58688 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -93,7 +93,7 @@ import org.apache.phoenix.iterate.MaterializedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.log.QueryLogInfo; -import org.apache.phoenix.log.QueryLogState; +import org.apache.phoenix.log.QueryStatus; import org.apache.phoenix.log.QueryLogger; import org.apache.phoenix.log.QueryLoggerUtil; import org.apache.phoenix.optimize.Cost; @@ -188,8 +188,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.math.IntMath; @@ -316,10 +314,8 @@ public class PhoenixStatement implements Statement, SQLCloseable { StatementContext context = plan.getContext(); context.setQueryLogger(queryLogger); if(queryLogger.isDebugEnabled()){ - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator)); - queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null); - queryLogger.log(QueryLogState.COMPILED, queryLogBuilder.build()); + queryLogger.log(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator)); + queryLogger.log(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null); } context.getOverallQueryMetrics().startQuery(); PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext()); @@ -348,6 +344,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } throw e; }catch (RuntimeException e) { + // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. if (e.getCause() instanceof SQLException) { @@ -364,11 +361,9 @@ public class PhoenixStatement implements Statement, SQLCloseable { }, PhoenixContextExecutor.inContext()); }catch (Exception e) { if (queryLogger.isDebugEnabled()) { - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I, - System.currentTimeMillis() - queryLogger.getStartTime()); - queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); - queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build()); + queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); + queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString()); + queryLogger.sync(null, null); } Throwables.propagateIfInstanceOf(e, SQLException.class); Throwables.propagate(e); @@ -1695,7 +1690,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } QueryLogger queryLogger = QueryLogger.getInstance(connection,isSystemTable); QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(), - connection.getQueryServices(), sql, queryLogger.getStartTime(), getParameters()); + connection.getQueryServices(), sql, getParameters()); return queryLogger; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java index 5792658..269b4f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java @@ -18,5 +18,5 @@ package org.apache.phoenix.log; public enum LogLevel { - OFF, INFO, DEBUG, TRACE + OFF,INFO, DEBUG, TRACE } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java index 817f9ec..dab03e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java @@ -31,16 +31,18 @@ public interface LogWriter { * @param event * @throws SQLException * @throws IOException + * @throws ClassNotFoundException */ - void write(RingBufferEvent event) throws SQLException, IOException; + void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException; /** * will be called when disruptor is getting shutdown * * @throws IOException + * @throws SQLException */ - void close() throws IOException; + void close() throws IOException, SQLException; /** * if writer is closed and cannot write further event http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java index 87de267..fb38ba2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java @@ -28,8 +28,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; import org.apache.phoenix.schema.types.PDataType; @@ -40,29 +40,27 @@ import org.apache.phoenix.schema.types.PVarchar; public enum QueryLogInfo { - CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, PVarchar.INSTANCE), - QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, LogLevel.TRACE,PVarchar.INSTANCE), - QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - START_TIME_I(START_TIME,QueryLogState.STARTED, LogLevel.INFO,PTimestamp.INSTANCE), - USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), - EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE), - GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE), - NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE), - EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE), - QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE), - TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE), - SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE); + CLIENT_IP_I(CLIENT_IP, LogLevel.INFO, PVarchar.INSTANCE), + QUERY_I(QUERY, LogLevel.INFO,PVarchar.INSTANCE), + BIND_PARAMETERS_I(BIND_PARAMETERS, LogLevel.TRACE,PVarchar.INSTANCE), + QUERY_ID_I(QUERY_ID, LogLevel.INFO,PVarchar.INSTANCE), + TENANT_ID_I(TENANT_ID, LogLevel.INFO,PVarchar.INSTANCE), + START_TIME_I(START_TIME, LogLevel.INFO,PTimestamp.INSTANCE), + USER_I(USER, LogLevel.INFO,PVarchar.INSTANCE), + EXPLAIN_PLAN_I(EXPLAIN_PLAN,LogLevel.DEBUG,PVarchar.INSTANCE), + GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS, LogLevel.DEBUG,PVarchar.INSTANCE), + NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED, LogLevel.INFO,PLong.INSTANCE), + EXCEPTION_TRACE_I(EXCEPTION_TRACE, LogLevel.DEBUG,PVarchar.INSTANCE), + QUERY_STATUS_I(QUERY_STATUS, LogLevel.INFO,PVarchar.INSTANCE), + SCAN_METRICS_JSON_I(SCAN_METRICS_JSON, LogLevel.TRACE,PVarchar.INSTANCE), + TABLE_NAME_I(TABLE_NAME, LogLevel.DEBUG,PVarchar.INSTANCE); public final String columnName; - public final QueryLogState logState; public final LogLevel logLevel; public final PDataType dataType; - private QueryLogInfo(String columnName, QueryLogState logState, LogLevel logLevel, PDataType dataType) { + private QueryLogInfo(String columnName, LogLevel logLevel, PDataType dataType) { this.columnName = columnName; - this.logState=logState; this.logLevel=logLevel; this.dataType=dataType; } @@ -71,10 +69,6 @@ public enum QueryLogInfo { return columnName; } - public QueryLogState getLogState() { - return logState; - } - public LogLevel getLogLevel() { return logLevel; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java deleted file mode 100644 index e27f0e8..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.log; - -public enum QueryLogState { - STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java index b2fb235..ef5559c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java @@ -17,13 +17,17 @@ */ package org.apache.phoenix.log; +import java.util.Map; import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.util.EnvironmentEdgeManager; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import io.netty.util.internal.ThreadLocalRandom; @@ -34,15 +38,17 @@ public class QueryLogger { private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator = new ThreadLocal<>(); private QueryLoggerDisruptor queryDisruptor; private String queryId; - private Long startTime; private LogLevel logLevel; + private Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); + private boolean isSynced; private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class); private QueryLogger(PhoenixConnection connection) { this.queryId = UUID.randomUUID().toString(); this.queryDisruptor = connection.getQueryServices().getQueryDisruptor(); - this.startTime = System.currentTimeMillis(); logLevel = connection.getLogLevel(); + log(QueryLogInfo.QUERY_ID_I, queryId); + log(QueryLogInfo.START_TIME_I, EnvironmentEdgeManager.currentTimeMillis()); } private QueryLogger() { @@ -58,21 +64,32 @@ public class QueryLogger { return result; } - private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() { + public static final QueryLogger NO_OP_INSTANCE = new QueryLogger() { @Override - public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) { + public void log(QueryLogInfo queryLogInfo, Object info) { } - + @Override - public boolean isDebugEnabled(){ + public boolean isDebugEnabled() { return false; } - + @Override - public boolean isInfoEnabled(){ + public boolean isInfoEnabled() { return false; } + + @Override + public void sync( + Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + + } + + @Override + public boolean isSynced(){ + return true; + } }; public static QueryLogger getInstance(PhoenixConnection connection, boolean isSystemTable) { @@ -82,14 +99,14 @@ public class QueryLogger { } /** - * Add query log in the table, columns will be logged depending upon the connection logLevel - * @param logState State of the query - * @param map Value of the map should be in format of the corresponding data type + * Add query log in the table, columns will be logged depending upon the connection logLevel */ - public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) { - final RingBufferEventTranslator translator = getCachedTranslator(); - translator.setQueryInfo(logState, map, logLevel); - publishLogs(translator); + public void log(QueryLogInfo queryLogInfo, Object info) { + try { + queryLogBuilder.put(queryLogInfo, info); + } catch (Exception e) { + LOG.warn("Unable to add log info because of " + e.getMessage()); + } } private boolean publishLogs(RingBufferEventTranslator translator) { @@ -102,13 +119,6 @@ public class QueryLogger { } /** - * Start time when the logger was started, if {@link LogLevel#OFF} then it's the current time - */ - public Long getStartTime() { - return startTime != null ? startTime : System.currentTimeMillis(); - } - - /** * Is debug logging currently enabled? * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than debug. */ @@ -117,7 +127,8 @@ public class QueryLogger { } private boolean isLevelEnabled(LogLevel logLevel){ - return this.logLevel != null ? logLevel.ordinal() <= this.logLevel.ordinal() : false; + return this.logLevel != null && logLevel != LogLevel.OFF ? logLevel.ordinal() <= this.logLevel.ordinal() + : false; } /** @@ -142,4 +153,21 @@ public class QueryLogger { return this.queryId; } + + public void sync(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + if (!isSynced) { + isSynced = true; + final RingBufferEventTranslator translator = getCachedTranslator(); + translator.setQueryInfo(logLevel, queryLogBuilder.build(), readMetrics, overAllMetrics); + publishLogs(translator); + } + } + + /** + * Is Synced already + */ + public boolean isSynced(){ + return this.isSynced; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java index d5c4878..21917ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java @@ -25,48 +25,36 @@ import org.apache.commons.lang.StringUtils; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PName; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; - public class QueryLoggerUtil { + public static void logInitialDetails(QueryLogger queryLogger, PName tenantId, ConnectionQueryServices queryServices, - String query, long startTime, List<Object> bindParameters) { + String query, List<Object> bindParameters) { try { - queryLogger.log(QueryLogState.STARTED, - getInitialDetails(tenantId, queryServices, query, startTime, bindParameters)); + String clientIP; + try { + clientIP = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + clientIP = "UnknownHost"; + } + + if (clientIP != null) { + queryLogger.log(QueryLogInfo.CLIENT_IP_I, clientIP); + } + if (query != null) { + queryLogger.log(QueryLogInfo.QUERY_I, query); + } + if (bindParameters != null) { + queryLogger.log(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ",")); + } + if (tenantId != null) { + queryLogger.log(QueryLogInfo.TENANT_ID_I, tenantId.getString()); + } + + queryLogger.log(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName() + : queryServices.getUser().getShortName()); } catch (Exception e) { - // Ignore for now - } - - } - - private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName tenantId, - ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) { - Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); - String clientIP; - try { - clientIP = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - clientIP = "UnknownHost"; + // Ignore } - - if (clientIP != null) { - queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP); - } - if (query != null) { - queryLogBuilder.put(QueryLogInfo.QUERY_I, query); - } - queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime); - if (bindParameters != null) { - queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters, ",")); - } - if (tenantId != null) { - queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, tenantId.getString()); - } - - queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName() - : queryServices.getUser().getShortName()); - return queryLogBuilder.build(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java new file mode 100644 index 0000000..0e634c1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryStatus.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +public enum QueryStatus { + COMPILED, COMPLETED,FAILED +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java index 96e4bf9..8854e68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java @@ -17,14 +17,19 @@ */ package org.apache.phoenix.log; +import java.util.Map; + +import org.apache.phoenix.monitoring.MetricType; + import com.google.common.collect.ImmutableMap; import com.lmax.disruptor.EventFactory; class RingBufferEvent { private String queryId; - private QueryLogState logState; private LogLevel connectionLogLevel; private ImmutableMap<QueryLogInfo, Object> queryInfo; + private Map<String, Map<MetricType, Long>> readMetrics; + private Map<MetricType, Long> overAllMetrics; public static final Factory FACTORY = new Factory(); @@ -40,7 +45,6 @@ import com.lmax.disruptor.EventFactory; } public void clear() { - this.logState=null; this.queryInfo=null; this.queryId=null; } @@ -53,10 +57,6 @@ import com.lmax.disruptor.EventFactory; public static Factory getFactory() { return FACTORY; } - - public QueryLogState getLogState() { - return logState; - } public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) { this.queryInfo=queryInfo; @@ -73,12 +73,6 @@ import com.lmax.disruptor.EventFactory; } - public void setLogState(QueryLogState logState) { - this.logState=logState; - - } - - public LogLevel getConnectionLogLevel() { return connectionLogLevel; } @@ -88,6 +82,26 @@ import com.lmax.disruptor.EventFactory; this.connectionLogLevel = connectionLogLevel; } + + public Map<String, Map<MetricType, Long>> getReadMetrics() { + return readMetrics; + } + + + public void setReadMetrics(Map<String, Map<MetricType, Long>> readMetrics) { + this.readMetrics = readMetrics; + } + + + public Map<MetricType, Long> getOverAllMetrics() { + return overAllMetrics; + } + + + public void setOverAllMetrics(Map<MetricType, Long> overAllMetrics) { + this.overAllMetrics = overAllMetrics; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java index 653ddd6..742f8e1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java @@ -17,14 +17,19 @@ */ package org.apache.phoenix.log; +import java.util.Map; + +import org.apache.phoenix.monitoring.MetricType; + import com.google.common.collect.ImmutableMap; import com.lmax.disruptor.EventTranslator; class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> { private String queryId; - private QueryLogState logState; private ImmutableMap<QueryLogInfo, Object> queryInfo; private LogLevel connectionLogLevel; + private Map<String, Map<MetricType, Long>> readMetrics; + private Map<MetricType, Long> overAllMetrics; public RingBufferEventTranslator(String queryId) { this.queryId=queryId; @@ -34,20 +39,22 @@ class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> { public void translateTo(RingBufferEvent event, long sequence) { event.setQueryId(queryId); event.setQueryInfo(queryInfo); - event.setLogState(logState); + event.setReadMetrics(readMetrics); + event.setOverAllMetrics(overAllMetrics); event.setConnectionLogLevel(connectionLogLevel); clear(); } private void clear() { - setQueryInfo(null,null,null); + setQueryInfo(null,null,null,null); } - public void setQueryInfo(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> queryInfo, - LogLevel connectionLogLevel) { + public void setQueryInfo(LogLevel logLevel, ImmutableMap<QueryLogInfo, Object> queryInfo, Map<String, Map<MetricType, Long>> readMetrics, + Map<MetricType, Long> overAllMetrics) { this.queryInfo = queryInfo; - this.logState = logState; - this.connectionLogLevel = connectionLogLevel; + this.connectionLogLevel = logLevel; + this.readMetrics = readMetrics; + this.overAllMetrics=overAllMetrics; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java index 4398596..4969484 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java @@ -21,23 +21,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCH import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Map.Entry; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.expression.Determinism; -import org.apache.phoenix.expression.LiteralExpression; -import org.apache.phoenix.query.HBaseFactoryProvider; -import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.util.ByteUtil; -import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.util.QueryUtil; import com.google.common.collect.ImmutableMap; @@ -47,58 +41,97 @@ import com.google.common.collect.ImmutableMap; */ public class TableLogWriter implements LogWriter { private static final Log LOG = LogFactory.getLog(LogWriter.class); - private HConnection connection; + private Connection connection; private boolean isClosed; - private HTableInterface table; private Configuration config; + private PreparedStatement upsertStatement; + private Map<MetricType,Integer> metricOrdinals=new HashMap<MetricType,Integer>(); public TableLogWriter(Configuration configuration) { - this.config = configuration; - try { - this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); - table = HBaseFactoryProvider.getHTableFactory() - .getTable(SchemaUtil - .getPhysicalTableName( - SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config) - .getName(), connection, null); - } catch (Exception e) { - LOG.warn("Unable to initiate LogWriter for writing query logs to table"); + this.config=configuration; + } + + private PreparedStatement buildUpsertStatement(Connection conn) throws SQLException { + StringBuilder buf = new StringBuilder("UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"("); + int queryLogEntries=0; + for (QueryLogInfo info : QueryLogInfo.values()) { + buf.append(info.columnName); + buf.append(','); + queryLogEntries++; + } + for (MetricType metric : MetricType.values()) { + if (metric.logLevel() != LogLevel.OFF) { + metricOrdinals.put(metric, ++queryLogEntries); + buf.append(metric.columnName()); + buf.append(','); + } } + buf.setLength(buf.length()-1); + buf.append(") VALUES ("); + for (int i = 0; i < QueryLogInfo.values().length; i++) { + buf.append("?,"); + } + for (MetricType metric : MetricType.values()) { + if (metric.logLevel() != LogLevel.OFF) { + buf.append("?,"); + } + } + buf.setLength(buf.length()-1); + buf.append(")"); + return conn.prepareStatement(buf.toString()); } @Override - public void write(RingBufferEvent event) throws SQLException, IOException { + public void write(RingBufferEvent event) throws SQLException, IOException, ClassNotFoundException { if(isClosed()){ LOG.warn("Unable to commit query log as Log committer is already closed"); return; } - if (table == null || connection == null) { - LOG.warn("Unable to commit query log as connection was not initiated "); - return; + if (connection == null) { + synchronized (this) { + if (connection == null) { + connection = QueryUtil.getConnectionForQueryLog(this.config); + this.upsertStatement = buildUpsertStatement(connection); + } + } } - ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo(); - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - Put put =new Put(Bytes.toBytes(event.getQueryId())); - for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) { - if (entry.getKey().logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) { - LiteralExpression expression = LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType, - Determinism.ALWAYS); - expression.evaluate(null, ptr); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes(entry.getKey().columnName), - ByteUtil.copyKeyBytesIfNecessary(ptr)); + + ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo(); + for (QueryLogInfo info : QueryLogInfo.values()) { + if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) { + upsertStatement.setObject(info.ordinal() + 1, queryInfoMap.get(info)); + } else { + upsertStatement.setObject(info.ordinal() + 1, null); + } + } + Map<MetricType, Long> overAllMetrics = event.getOverAllMetrics(); + Map<String, Map<MetricType, Long>> readMetrics = event.getReadMetrics(); + + for (MetricType metric : MetricType.values()) { + if (overAllMetrics != null && overAllMetrics.containsKey(metric) + && metric.isLoggingEnabled(event.getConnectionLogLevel())) { + upsertStatement.setObject(metricOrdinals.get(metric), overAllMetrics.get(metric)); + } else { + if (metric.logLevel() != LogLevel.OFF) { + upsertStatement.setObject(metricOrdinals.get(metric), null); + } } } - - if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal() - && (event.getLogState() == QueryLogState.COMPLETED || event.getLogState() == QueryLogState.FAILED)) { - LiteralExpression expression = LiteralExpression.newConstant(event.getLogState().toString(), - QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS); - expression.evaluate(null, ptr); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), ByteUtil.copyKeyBytesIfNecessary(ptr)); + + if (readMetrics != null && !readMetrics.isEmpty()) { + for (Map.Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) { + upsertStatement.setObject(QueryLogInfo.TABLE_NAME_I.ordinal() + 1, entry.getKey()); + for (MetricType metric : entry.getValue().keySet()) { + if (metric.isLoggingEnabled(event.getConnectionLogLevel())) { + upsertStatement.setObject(metricOrdinals.get(metric), entry.getValue().get(metric)); + } + } + upsertStatement.executeUpdate(); + } + } else { + upsertStatement.executeUpdate(); } - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); - table.put(put); + connection.commit(); } @@ -109,18 +142,16 @@ public class TableLogWriter implements LogWriter { } isClosed=true; try { - if (table != null) { - table.close(); - } - if (connection != null && !connection.isClosed()) { - //It should internally close all the statements + if (connection != null) { + // It should internally close all the statements connection.close(); } - } catch (IOException e) { + } catch (SQLException e) { // TODO Ignore? } } + @Override public boolean isClosed(){ return isClosed; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index eafff99..4fed458 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -37,7 +37,15 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.iterate.*; +import org.apache.phoenix.iterate.ConcatResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.iterate.PeekingResultIterator; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.RoundRobinResultIterator; +import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.iterate.TableSnapshotResultIterator; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.monitoring.ReadMetricQueue; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java index 0e82ce4..daa0bba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java @@ -26,7 +26,6 @@ import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME; public class MemoryMetricsHolder { private final CombinableMetric memoryChunkSizeMetric; private final CombinableMetric memoryWaitTimeMetric; - public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null); public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) { this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 45295a7..4582c5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -16,51 +16,62 @@ * limitations under the License. */ package org.apache.phoenix.monitoring; +import org.apache.phoenix.log.LogLevel; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PLong; + +/** + * Keeping {@link LogLevel#OFF} for metrics which are calculated globally only and doesn't need to be logged in SYSTEM.LOG + */ public enum MetricType { - NO_OP_METRIC("no", "No op metric"), - // mutation (write) related metrics - MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch"), - MUTATION_BYTES("mb", "Size of mutations in bytes"), - MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations"), - MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed"), - MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements"), - // query (read) related metrics - QUERY_TIME("qt", "Query times"), - QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out"), - QUERY_FAILED_COUNTER("qf", "Number of times query failed"), - NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel"), - SCAN_BYTES("sb", "Number of bytes read by scans"), - SELECT_SQL_COUNTER("sc", "Counter for number of sql queries"), - // task metrics - TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor"), - TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion"), - TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute"), - TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor"), - TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor"), - // spool metrics - SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes"), - SPOOL_FILE_COUNTER("sn", "Number of spool files created"), - // misc metrics - MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager"), - MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager"), - CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits"), - WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution"), - RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()"), - OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections"), - QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated"), - HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver"), - PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " + - "because there are already too many to that target cluster."), - PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not."); + NO_OP_METRIC("no", "No op metric",LogLevel.OFF, PLong.INSTANCE), + // mutation (write) related metrics + MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch",LogLevel.OFF, PLong.INSTANCE), + MUTATION_BYTES("mb", "Size of mutations in bytes",LogLevel.OFF, PLong.INSTANCE), + MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations",LogLevel.OFF, PLong.INSTANCE), + MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed",LogLevel.OFF, PLong.INSTANCE), + MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements",LogLevel.OFF, PLong.INSTANCE), + // query (read) related metrics + QUERY_TIME("qt", "Query times",LogLevel.OFF, PLong.INSTANCE), + QUERY_TIMEOUT_COUNTER("qo", "Number of times query timed out",LogLevel.DEBUG, PLong.INSTANCE), + QUERY_FAILED_COUNTER("qf", "Number of times query failed",LogLevel.DEBUG, PLong.INSTANCE), + NUM_PARALLEL_SCANS("ps", "Number of scans that were executed in parallel",LogLevel.DEBUG, PLong.INSTANCE), + SCAN_BYTES("sb", "Number of bytes read by scans",LogLevel.OFF, PLong.INSTANCE), + SELECT_SQL_COUNTER("sc", "Counter for number of sql queries",LogLevel.OFF, PLong.INSTANCE), + // task metrics + TASK_QUEUE_WAIT_TIME("tw", "Time in milliseconds tasks had to wait in the queue of the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE), + TASK_END_TO_END_TIME("tee", "Time in milliseconds spent by tasks from creation to completion",LogLevel.DEBUG, PLong.INSTANCE), + TASK_EXECUTION_TIME("tx", "Time in milliseconds tasks took to execute",LogLevel.DEBUG, PLong.INSTANCE), + TASK_EXECUTED_COUNTER("te", "Counter for number of tasks submitted to the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE), + TASK_REJECTED_COUNTER("tr", "Counter for number of tasks that were rejected by the thread pool executor",LogLevel.DEBUG, PLong.INSTANCE), + // spool metrics + SPOOL_FILE_SIZE("ss", "Size of spool files created in bytes",LogLevel.DEBUG, PLong.INSTANCE), + SPOOL_FILE_COUNTER("sn", "Number of spool files created",LogLevel.DEBUG, PLong.INSTANCE), + // misc metrics + MEMORY_CHUNK_BYTES("mc", "Number of bytes allocated by the memory manager",LogLevel.DEBUG, PLong.INSTANCE), + MEMORY_WAIT_TIME("mw", "Number of milliseconds threads needed to wait for memory to be allocated through memory manager",LogLevel.DEBUG, PLong.INSTANCE), + CACHE_REFRESH_SPLITS_COUNTER("cr", "Number of times cache was refreshed because of splits",LogLevel.DEBUG, PLong.INSTANCE), + WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE), + RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE), + OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE), + QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE), + HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE), + PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " + + "because there are already too many to that target cluster.",LogLevel.OFF, PLong.INSTANCE), + PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE); private final String description; private final String shortName; + private LogLevel logLevel; + private PDataType dataType; - private MetricType(String shortName, String description) { + private MetricType(String shortName, String description, LogLevel logLevel, PDataType dataType) { this.shortName = shortName; this.description = description; + this.logLevel=logLevel; + this.dataType=dataType; } public String description() { @@ -70,5 +81,34 @@ public enum MetricType { public String shortName() { return shortName; } + + public LogLevel logLevel() { + return logLevel; + } + + public PDataType dataType() { + return dataType; + } + + public String columnName() { + return name(); + } + + public boolean isLoggingEnabled(LogLevel connectionLogLevel){ + return logLevel() != LogLevel.OFF && (logLevel().ordinal() <= connectionLogLevel.ordinal()); + } + public static String getMetricColumnsDetails() { + StringBuilder buffer=new StringBuilder(); + for(MetricType metric:MetricType.values()){ + if (metric.logLevel() != LogLevel.OFF) { + buffer.append(metric.columnName()); + buffer.append(" "); + buffer.append(metric.dataType.getSqlTypeName()); + buffer.append(","); + } + } + return buffer.toString(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java new file mode 100644 index 0000000..1e5ac08 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricUtil.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import org.apache.phoenix.log.LogLevel; +import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; + +public class MetricUtil { + + public static CombinableMetric getCombinableMetric(LogLevel connectionLogLevel, MetricType type) { + if (!type.isLoggingEnabled(connectionLogLevel)) { return NoOpRequestMetric.INSTANCE; } + return new CombinableMetricImpl(type); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java index 3de2be1..1256f5c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java @@ -27,6 +27,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import org.apache.phoenix.log.LogLevel; + /** * Queue that tracks various writes/mutations related phoenix request metrics. */ @@ -81,12 +83,16 @@ public class MutationMetricQueue { * Class that holds together the various metrics associated with mutations. */ public static class MutationMetric { - private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE); - private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES); - private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME); - private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE); - - public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) { + private final CombinableMetric numMutations;; + private final CombinableMetric mutationsSizeBytes; + private final CombinableMetric totalCommitTimeForMutations; + private final CombinableMetric numFailedMutations; + + public MutationMetric(LogLevel connectionLogLevel, long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) { + this.numMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_SIZE); + this.mutationsSizeBytes =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BYTES); + this.totalCommitTimeForMutations =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_COMMIT_TIME); + this.numFailedMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_FAILED_SIZE); this.numMutations.change(numMutations); this.mutationsSizeBytes.change(mutationsSizeBytes); this.totalCommitTimeForMutations.change(commitTimeForMutations); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java index b995267..3121ecd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java @@ -27,7 +27,7 @@ import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS; import java.util.HashMap; import java.util.Map; -import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; +import org.apache.phoenix.log.LogLevel; /** * Class that represents the overall metrics associated with a query being executed by the phoenix. @@ -42,16 +42,15 @@ public class OverAllQueryMetrics { private final CombinableMetric queryFailed; private final CombinableMetric cacheRefreshedDueToSplits; - public OverAllQueryMetrics(boolean isMetricsEnabled) { - queryWatch = new MetricsStopWatch(isMetricsEnabled); - resultSetWatch = new MetricsStopWatch(isMetricsEnabled); - numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE; - wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE; - resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE; - queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE; - queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE; - cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER) - : NoOpRequestMetric.INSTANCE; + public OverAllQueryMetrics(LogLevel connectionLogLevel) { + queryWatch = new MetricsStopWatch(WALL_CLOCK_TIME_MS.isLoggingEnabled(connectionLogLevel)); + resultSetWatch = new MetricsStopWatch(RESULT_SET_TIME_MS.isLoggingEnabled(connectionLogLevel)); + numParallelScans = MetricUtil.getCombinableMetric(connectionLogLevel, NUM_PARALLEL_SCANS); + wallClockTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, WALL_CLOCK_TIME_MS); + resultSetTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, RESULT_SET_TIME_MS); + queryTimedOut = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_TIMEOUT_COUNTER); + queryFailed = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_FAILED_COUNTER); + cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER); } public void updateNumParallelScans(long numParallelScans) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java index 5c4238e..619776a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nonnull; +import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; import com.google.common.annotations.VisibleForTesting; @@ -36,19 +37,23 @@ public class ReadMetricQueue { private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>(); - private final boolean isRequestMetricsEnabled; - public ReadMetricQueue(boolean isRequestMetricsEnabled) { - this.isRequestMetricsEnabled = isRequestMetricsEnabled; + private LogLevel connectionLogLevel; + + public ReadMetricQueue(LogLevel connectionLogLevel) { + this.connectionLogLevel = connectionLogLevel; } public CombinableMetric allotMetric(MetricType type, String tableName) { - if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; } - MetricKey key = new MetricKey(type, tableName); - Queue<CombinableMetric> q = getMetricQueue(key); - CombinableMetric metric = getMetric(type); - q.offer(metric); - return metric; + if (type.isLoggingEnabled(connectionLogLevel)) { + MetricKey key = new MetricKey(type, tableName); + Queue<CombinableMetric> q = getMetricQueue(key); + CombinableMetric metric = getMetric(type); + q.offer(metric); + return metric; + } else { + return NoOpRequestMetric.INSTANCE; + } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java index 4373887..699982f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.monitoring; +import org.apache.phoenix.log.LogLevel; /** * Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a @@ -26,7 +27,7 @@ public class SpoolingMetricsHolder { private final CombinableMetric spoolFileSizeMetric; private final CombinableMetric numSpoolFileMetric; - public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), ""); + public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(LogLevel.OFF), ""); public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) { this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java index 98ff57c..6117b40 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java @@ -23,6 +23,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER; +import org.apache.phoenix.log.LogLevel; + /** * Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client @@ -35,7 +37,7 @@ public class TaskExecutionMetricsHolder { private final CombinableMetric taskExecutionTime; private final CombinableMetric numTasks; private final CombinableMetric numRejectedTasks; - public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), ""); + public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(LogLevel.OFF), ""); public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) { taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5607eef/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 26e22a5..3bc2f48 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -2466,7 +2466,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // Available for testing protected String getLogTableDDL() { - return QueryConstants.CREATE_LOG_METADATA; + return setSystemLogDDLProperties(QueryConstants.CREATE_LOG_METADATA); + } + + private String setSystemLogDDLProperties(String ddl) { + return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS)); + } private String setSystemDDLProperties(String ddl) {