PHOENIX-2715 Query Log (Ankit Singhal)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b291068b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b291068b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b291068b Branch: refs/heads/4.x-cdh5.14 Commit: b291068bc9c6e133c7bcb6dfe52dd4f96a76f2a1 Parents: 4d9cc92 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Tue Apr 10 07:53:31 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Fri Apr 13 23:26:16 2018 +0100 ---------------------------------------------------------------------- phoenix-core/pom.xml | 5 + .../end2end/QueryDatabaseMetaDataIT.java | 4 + .../apache/phoenix/end2end/QueryLoggerIT.java | 358 +++++++++++++++++++ .../end2end/TenantSpecificTablesDDLIT.java | 2 + .../phoenix/compile/StatementContext.java | 10 + .../phoenix/coprocessor/MetaDataProtocol.java | 5 +- .../phoenix/iterate/ScanningResultIterator.java | 18 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 17 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 18 + .../phoenix/jdbc/PhoenixPreparedStatement.java | 11 +- .../apache/phoenix/jdbc/PhoenixResultSet.java | 38 ++ .../apache/phoenix/jdbc/PhoenixStatement.java | 67 +++- .../java/org/apache/phoenix/log/LogLevel.java | 22 ++ .../java/org/apache/phoenix/log/LogWriter.java | 51 +++ .../log/QueryLogDetailsEventHandler.java | 63 ++++ .../org/apache/phoenix/log/QueryLogInfo.java | 87 +++++ .../org/apache/phoenix/log/QueryLogState.java | 22 ++ .../org/apache/phoenix/log/QueryLogger.java | 145 ++++++++ .../log/QueryLoggerDefaultExceptionHandler.java | 51 +++ .../phoenix/log/QueryLoggerDisruptor.java | 117 ++++++ .../org/apache/phoenix/log/QueryLoggerUtil.java | 62 ++++ .../org/apache/phoenix/log/RingBufferEvent.java | 93 +++++ .../phoenix/log/RingBufferEventTranslator.java | 53 +++ .../org/apache/phoenix/log/TableLogWriter.java | 125 +++++++ .../phoenix/monitoring/ReadMetricQueue.java | 44 ++- .../phoenix/monitoring/ScanMetricsHolder.java | 48 ++- .../phoenix/query/ConnectionQueryServices.java | 6 + .../query/ConnectionQueryServicesImpl.java | 35 +- .../query/ConnectionlessQueryServicesImpl.java | 18 + .../query/DelegateConnectionQueryServices.java | 14 + .../apache/phoenix/query/QueryConstants.java | 45 +++ .../org/apache/phoenix/query/QueryServices.java | 4 + .../phoenix/query/QueryServicesOptions.java | 9 +- pom.xml | 6 + 34 files changed, 1612 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index b07cbbb..8fe8a10 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -526,5 +526,10 @@ <artifactId>i18n-util</artifactId> <version>${i18n-util.version}</version> </dependency> + <dependency> + <groupId>com.lmax</groupId> + <artifactId>disruptor</artifactId> + <version>${disruptor.version}</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java index a1bcf40..54cb5da 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java @@ -166,6 +166,10 @@ public class QueryDatabaseMetaDataIT extends ParallelStatsDisabledIT { assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); assertTrue(rs.next()); assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM")); + assertEquals(PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE, rs.getString("TABLE_NAME")); + assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); + assertTrue(rs.next()); + assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM")); assertEquals(TYPE_SEQUENCE, rs.getString("TABLE_NAME")); assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); assertTrue(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java new file mode 100644 index 0000000..940ba6f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.InetAddress; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.log.LogLevel; +import org.apache.phoenix.log.QueryLogState; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT { + + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Enable request metric collection at the driver level + props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); + // disable renewing leases as this will force spooling to happen. + props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + // need the non-test driver for some tests that check number of hconnections, etc. + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + + @Test + public void testDebugLogs() throws Exception { + String tableName = generateUniqueName(); + createTableAndInsertValues(tableName, true); + Properties props= new Properties(); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); + Connection conn = DriverManager.getConnection(getUrl(),props); + assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG); + String query = "SELECT * FROM " + tableName; + ResultSet rs = conn.createStatement().executeQuery(query); + StatementContext context = ((PhoenixResultSet)rs).getContext(); + String queryId = context.getQueryLogger().getQueryId(); + while (rs.next()) { + rs.getString(1); + rs.getString(2); + } + ResultSet explainRS = conn.createStatement().executeQuery("Explain " + query); + + String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; + int delay = 5000; + + // sleep for sometime to let query log committed + Thread.sleep(delay); + while (rs.next()) { + if (rs.getString(QUERY_ID).equals(queryId)) { + foundQueryLog = true; + assertEquals(rs.getString(BIND_PARAMETERS), null); + assertEquals(rs.getString(USER), System.getProperty("user.name")); + assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); + assertEquals(rs.getString(EXPLAIN_PLAN), QueryUtil.getExplainPlan(explainRS)); + assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); + assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10); + assertEquals(rs.getString(QUERY), query); + assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString()); + assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(TENANT_ID), null); + assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); + assertTrue(rs.getString(SCAN_METRICS_JSON).contains("scanMetrics")); + assertEquals(rs.getString(EXCEPTION_TRACE),null); + }else{ + //confirm we are not logging system queries + assertFalse(rs.getString(QUERY).toString().contains(SYSTEM_CATALOG_SCHEMA)); + } + } + assertTrue(foundQueryLog); + conn.close(); + } + + @Test + public void testLogSampling() throws Exception { + String tableName = generateUniqueName(); + createTableAndInsertValues(tableName, true); + Properties props= new Properties(); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); + props.setProperty(QueryServices.LOG_SAMPLE_RATE, "0.5"); + Connection conn = DriverManager.getConnection(getUrl(),props); + assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG); + String query = "SELECT * FROM " + tableName; + int count=100; + for (int i = 0; i < count; i++) { + conn.createStatement().executeQuery(query); + } + + String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; + ResultSet rs = conn.createStatement().executeQuery(logQuery); + int delay = 5000; + + // sleep for sometime to let query log committed + Thread.sleep(delay); + int logCount=0; + while (rs.next()) { + logCount++; + } + + //sampling rate is 0.5 , but with lesser count, uniformity of thread random may not be perfect, so taking 0.75 for comparison + assertTrue(logCount != 0 && logCount < count * 0.75); + conn.close(); + } + + @Test + public void testInfoLogs() throws Exception{ + String tableName = generateUniqueName(); + createTableAndInsertValues(tableName, true); + Properties props= new Properties(); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.INFO.name()); + Connection conn = DriverManager.getConnection(getUrl(),props); + assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.INFO); + String query = "SELECT * FROM " + tableName; + + ResultSet rs = conn.createStatement().executeQuery(query); + StatementContext context = ((PhoenixResultSet)rs).getContext(); + String queryId = context.getQueryLogger().getQueryId(); + while (rs.next()) { + rs.getString(1); + rs.getString(2); + } + + String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; + int delay = 5000; + + // sleep for sometime to let query log committed + Thread.sleep(delay); + while (rs.next()) { + if (rs.getString(QUERY_ID).equals(queryId)) { + foundQueryLog = true; + assertEquals(rs.getString(USER), System.getProperty("user.name")); + assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); + assertEquals(rs.getString(EXPLAIN_PLAN), null); + assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null); + assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0); + assertEquals(rs.getString(QUERY), query); + assertEquals(rs.getString(QUERY_STATUS),null); + assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(TENANT_ID), null); + assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null); + } + } + assertTrue(foundQueryLog); + conn.close(); + } + + @Test + public void testWithLoggingOFF() throws Exception{ + String tableName = generateUniqueName(); + createTableAndInsertValues(tableName, true); + Properties props= new Properties(); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name()); + Connection conn = DriverManager.getConnection(getUrl(),props); + assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.OFF); + String query = "SELECT * FROM " + tableName; + + ResultSet rs = conn.createStatement().executeQuery(query); + StatementContext context = ((PhoenixResultSet)rs).getContext(); + String queryId = context.getQueryLogger().getQueryId(); + while (rs.next()) { + rs.getString(1); + rs.getString(2); + } + + String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; + int delay = 5000; + + // sleep for sometime to let query log committed + Thread.sleep(delay); + while (rs.next()) { + if (rs.getString(QUERY_ID).equals(queryId)) { + foundQueryLog = true; + } + } + assertFalse(foundQueryLog); + conn.close(); + } + + + @Test + public void testPreparedStatementWithTrace() throws Exception{ + testPreparedStatement(LogLevel.TRACE); + } + + @Test + public void testPreparedStatementWithDebug() throws Exception{ + testPreparedStatement(LogLevel.DEBUG); + } + + private void testPreparedStatement(LogLevel loglevel) throws Exception{ + String tableName = generateUniqueName(); + createTableAndInsertValues(tableName, true); + Properties props= new Properties(); + props.setProperty(QueryServices.LOG_LEVEL, loglevel.name()); + Connection conn = DriverManager.getConnection(getUrl(),props); + assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel); + + String query = "SELECT * FROM " + tableName +" where V = ?"; + + PreparedStatement pstmt = conn.prepareStatement(query); + pstmt.setString(1, "value5"); + ResultSet rs = pstmt.executeQuery(); + StatementContext context = ((PhoenixResultSet)rs).getContext(); + String queryId = context.getQueryLogger().getQueryId(); + while (rs.next()) { + rs.getString(1); + rs.getString(2); + } + ResultSet explainRS = conn.createStatement() + .executeQuery("Explain " + "SELECT * FROM " + tableName + " where V = 'value5'"); + String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; + rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; + int delay = 5000; + + // sleep for sometime to let query log committed + Thread.sleep(delay); + while (rs.next()) { + if (rs.getString(QUERY_ID).equals(queryId)) { + foundQueryLog = true; + assertEquals(rs.getString(BIND_PARAMETERS), loglevel == LogLevel.TRACE ? "value5" : null); + assertEquals(rs.getString(USER), System.getProperty("user.name")); + assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); + assertEquals(rs.getString(EXPLAIN_PLAN), QueryUtil.getExplainPlan(explainRS)); + assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), context.getScan().toJSON()); + assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1); + assertEquals(rs.getString(QUERY), query); + assertEquals(rs.getString(QUERY_STATUS), QueryLogState.COMPLETED.toString()); + assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertEquals(rs.getString(TENANT_ID), null); + assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); + } + } + assertTrue(foundQueryLog); + conn.close(); + } + + + + @Test + public void testFailedQuery() throws Exception { + String tableName = generateUniqueName(); + Properties props = new Properties(); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); + Connection conn = DriverManager.getConnection(getUrl(), props); + assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(), LogLevel.DEBUG); + // Table does not exists + String query = "SELECT * FROM " + tableName; + + try { + conn.createStatement().executeQuery(query); + fail(); + } catch (SQLException e) { + assertEquals(e.getErrorCode(), SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()); + } + String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\""; + ResultSet rs = conn.createStatement().executeQuery(logQuery); + boolean foundQueryLog = false; + int delay = 5000; + + // sleep for sometime to let query log committed + Thread.sleep(delay); + while (rs.next()) { + if (QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) { + foundQueryLog = true; + assertEquals(rs.getString(USER), System.getProperty("user.name")); + assertEquals(rs.getString(CLIENT_IP), InetAddress.getLocalHost().getHostAddress()); + assertEquals(rs.getString(EXPLAIN_PLAN), null); + assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), null); + assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0); + assertEquals(rs.getString(QUERY), query); + assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage())); + assertTrue(System.currentTimeMillis() - rs.getTimestamp(START_TIME).getTime() > delay); + assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null); + } + } + assertTrue(foundQueryLog); + conn.close(); + } + + private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate) + throws Exception { + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(ddl); + // executing 10 upserts/mutations. + String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + for (int i = 1; i <= 10; i++) { + stmt.setString(1, "key" + i); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + conn.commit(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java index f8dfd65..34a1312 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java @@ -493,6 +493,8 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT { assertTrue(rs.next()); assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, SYSTEM_FUNCTION_TABLE, SYSTEM); assertTrue(rs.next()); + assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE, PTableType.SYSTEM); + assertTrue(rs.next()); assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM); assertTrue(rs.next()); assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index 39d8525..c105046 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.log.QueryLogger; import org.apache.phoenix.monitoring.OverAllQueryMetrics; import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.parse.SelectStatement; @@ -83,6 +84,7 @@ public class StatementContext { private Map<SelectStatement, Object> subqueryResults; private final ReadMetricQueue readMetricsQueue; private final OverAllQueryMetrics overAllQueryMetrics; + private QueryLogger queryLogger; public StatementContext(PhoenixStatement statement) { this(statement, new Scan()); @@ -306,5 +308,13 @@ public class StatementContext { public OverAllQueryMetrics getOverallQueryMetrics() { return overAllQueryMetrics; } + + public void setQueryLogger(QueryLogger queryLogger) { + this.queryLogger=queryLogger; + } + + public QueryLogger getQueryLogger() { + return queryLogger; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index efad1e7..4c4c96f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -76,6 +76,8 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final boolean DEFAULT_META_DATA_KEEP_DELETED_CELLS = true; public static final int DEFAULT_MAX_STAT_DATA_VERSIONS = 1; public static final boolean DEFAULT_STATS_KEEP_DELETED_CELLS = false; + public static final int DEFAULT_LOG_VERSIONS = 10; + public static final int DEFAULT_LOG_TTL = 7 * 24 * 60 * 60; // 7 days // Min system table timestamps for every release. public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 = MIN_TABLE_TIMESTAMP + 3; @@ -118,7 +120,8 @@ public abstract class MetaDataProtocol extends MetaDataService { TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, "4.14.x"); } - public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; + public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; + // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need // a different code for every type of error. http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index 011feaa..9a31238 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -17,18 +17,18 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_NOT_SERVING_REGION_EXCEPTION; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_RETRIES; -import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_FILTERED; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES; import java.io.IOException; import java.sql.SQLException; @@ -93,7 +93,7 @@ public class ScanningResultIterator implements ResultIterator { if(scanMetricsMap == null) { return; } - + scanMetricsHolder.setScanMetricMap(scanMetricsMap); changeMetric(scanMetricsHolder.getCountOfRPCcalls(), scanMetricsMap.get(RPC_CALLS_METRIC_NAME)); changeMetric(scanMetricsHolder.getCountOfRemoteRPCcalls(), http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 2b428c9..d3626f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -75,11 +75,11 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.iterate.TableResultIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser; +import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.query.ConnectionQueryServices; -import org.apache.phoenix.query.ConnectionQueryServices.Feature; import org.apache.phoenix.query.DelegateConnectionQueryServices; import org.apache.phoenix.query.MetaDataMutated; import org.apache.phoenix.query.PropertyPolicyProvider; @@ -168,6 +168,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue; private TableResultIteratorFactory tableResultIteratorFactory; private boolean isRunningUpgrade; + private LogLevel logLevel; + private Double logSamplingRate; static { Tracing.addTraceMetricsSource(); @@ -378,6 +380,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.scannerQueue = new LinkedBlockingQueue<>(); this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory(); this.isRunningUpgrade = isRunningUpgrade; + this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL, + QueryServicesOptions.DEFAULT_LOGGING_LEVEL)); + this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE, + QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE)); GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); } @@ -648,6 +654,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } finally { services.removeConnection(this); } + } finally { isClosed = true; GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement(); @@ -1274,4 +1281,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.isRunningUpgrade = isRunningUpgrade; } + public LogLevel getLogLevel(){ + return this.logLevel; + } + + public Double getLogSamplingRate(){ + return this.logSamplingRate; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index b88b381..9caf7fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -343,6 +343,24 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String USE_STATS_FOR_PARALLELIZATION = "USE_STATS_FOR_PARALLELIZATION"; public static final byte[] USE_STATS_FOR_PARALLELIZATION_BYTES = Bytes.toBytes(USE_STATS_FOR_PARALLELIZATION); + + //SYSTEM:LOG + public static final String SYSTEM_LOG_TABLE = "LOG"; + public static final String QUERY_ID = "QUERY_ID"; + public static final String USER = "USER"; + public static final String CLIENT_IP = "CLIENT_IP"; + public static final String QUERY = "QUERY"; + public static final String EXPLAIN_PLAN = "EXPLAIN_PLAN"; + public static final String TOTAL_EXECUTION_TIME = "TOTAL_EXECUTION_TIME"; + public static final String NO_OF_RESULTS_ITERATED = "NO_OF_RESULTS_ITERATED"; + public static final String QUERY_STATUS = "QUERY_STATUS"; + public static final String EXCEPTION_TRACE = "EXCEPTION_TRACE"; + public static final String GLOBAL_SCAN_DETAILS = "GLOBAL_SCAN_DETAILS"; + public static final String SCAN_METRICS_JSON = "SCAN_METRICS_JSON"; + public static final String START_TIME = "START_TIME"; + public static final String BIND_PARAMETERS = "BIND_PARAMETERS"; + + PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException { this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false)); this.connection = connection; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java index 71ecb8d..914ea33 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java @@ -169,7 +169,13 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) .build().buildException(); } - return execute(statement); + if (statement.getOperation().isMutation()) { + executeMutation(statement); + return false; + } + executeQuery(statement, createQueryLogger(statement,query)); + return true; + } @Override @@ -183,7 +189,8 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar if (statement.getOperation().isMutation()) { throw new ExecuteQueryNotApplicableException(statement.getOperation()); } - return executeQuery(statement); + + return executeQuery(statement,createQueryLogger(statement,query)); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index d3ec151..153fa08 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -50,6 +50,9 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.log.QueryLogInfo; +import org.apache.phoenix.log.QueryLogState; +import org.apache.phoenix.log.QueryLogger; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.OverAllQueryMetrics; import org.apache.phoenix.monitoring.ReadMetricQueue; @@ -72,6 +75,9 @@ import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.SQLCloseable; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; @@ -122,6 +128,14 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { private boolean isClosed = false; private boolean wasNull = false; private boolean firstRecordRead = false; + + private QueryLogger queryLogger; + + private Long count = 0L; + + private QueryLogState logStatus = QueryLogState.COMPLETED; + + private RuntimeException exception; public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException { this.rowProjector = rowProjector; @@ -130,6 +144,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { this.statement = context.getStatement(); this.readMetricsQueue = context.getReadMetricsQueue(); this.overAllQueryMetrics = context.getOverallQueryMetrics(); + this.queryLogger = context.getQueryLogger(); } @Override @@ -779,17 +794,39 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { currentRow = scanner.next(); if (currentRow == null) { close(); + }else{ + count++; } rowProjector.reset(); } catch (RuntimeException e) { + this.logStatus=QueryLogState.FAILED; // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. + this.exception = e; if (e.getCause() instanceof SQLException) { throw (SQLException) e.getCause(); } throw e; + }finally{ + if (currentRow == null && queryLogger != null ) { + if (queryLogger.isDebugEnabled()) { + Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); + queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, count); + queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I, + System.currentTimeMillis() - queryLogger.getStartTime()); + queryLogBuilder.put(QueryLogInfo.SCAN_METRICS_JSON_I, + readMetricsQueue.getScanMetricsHolderList().toString()); + if (this.exception != null) { + queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, + Throwables.getStackTraceAsString(this.exception)); + } + readMetricsQueue.getScanMetricsHolderList().clear(); + queryLogger.log(logStatus, queryLogBuilder.build()); + } + } } if (currentRow == null) { + overAllQueryMetrics.endQuery(); overAllQueryMetrics.stopResultSetWatch(); } @@ -1301,6 +1338,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { public void resetMetrics() { readMetricsQueue.clearMetrics(); + readMetricsQueue.getScanMetricsHolderList().clear(); overAllQueryMetrics.reset(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 4a692d3..f526419 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -92,6 +92,10 @@ import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.iterate.MaterializedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.log.QueryLogInfo; +import org.apache.phoenix.log.QueryLogState; +import org.apache.phoenix.log.QueryLogger; +import org.apache.phoenix.log.QueryLoggerUtil; import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AddJarsStatement; @@ -186,6 +190,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.math.IntMath; @@ -269,26 +275,19 @@ public class PhoenixStatement implements Statement, SQLCloseable { return new PhoenixResultSet(iterator, projector, context); } - protected boolean execute(final CompilableStatement stmt) throws SQLException { - if (stmt.getOperation().isMutation()) { - executeMutation(stmt); - return false; - } - executeQuery(stmt); - return true; - } - protected QueryPlan optimizeQuery(CompilableStatement stmt) throws SQLException { QueryPlan plan = stmt.compilePlan(this, Sequence.ValueOp.VALIDATE_SEQUENCE); return connection.getQueryServices().getOptimizer().optimize(this, plan); } - protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException { - return executeQuery(stmt,true); + protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger) + throws SQLException { + return executeQuery(stmt, true, queryLogger); } private PhoenixResultSet executeQuery(final CompilableStatement stmt, - final boolean doRetryOnMetaNotFoundError) throws SQLException { + final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger) throws SQLException { GLOBAL_SELECT_SQL_COUNTER.increment(); + try { return CallRunner.run( new CallRunner.CallableThrowable<PhoenixResultSet, SQLException>() { @@ -297,6 +296,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { final long startTime = System.currentTimeMillis(); try { PhoenixConnection conn = getConnection(); + if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade() && stmt.getOperation() != Operation.UPGRADE) { throw new UpgradeRequiredException(); @@ -317,6 +317,13 @@ public class PhoenixStatement implements Statement, SQLCloseable { logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection)); } StatementContext context = plan.getContext(); + context.setQueryLogger(queryLogger); + if(queryLogger.isDebugEnabled()){ + Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); + queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, QueryUtil.getExplainPlan(resultIterator)); + queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, context.getScan()!=null?context.getScan().toString():null); + queryLogger.log(QueryLogState.COMPILED, queryLogBuilder.build()); + } context.getOverallQueryMetrics().startQuery(); PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext()); resultSets.add(rs); @@ -338,7 +345,8 @@ public class PhoenixStatement implements Statement, SQLCloseable { logger.debug("Reloading table "+ e.getTableName()+" data from server"); if(new MetaDataClient(connection).updateCache(connection.getTenantId(), e.getSchemaName(), e.getTableName(), true).wasUpdated()){ - return executeQuery(stmt, false); + //TODO we can log retry count and error for debugging in LOG table + return executeQuery(stmt, false, queryLogger); } } throw e; @@ -358,6 +366,13 @@ public class PhoenixStatement implements Statement, SQLCloseable { } }, PhoenixContextExecutor.inContext()); }catch (Exception e) { + if (queryLogger.isDebugEnabled()) { + Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); + queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I, + System.currentTimeMillis() - queryLogger.getStartTime()); + queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); + queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build()); + } Throwables.propagateIfInstanceOf(e, SQLException.class); Throwables.propagate(e); throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws @@ -1750,16 +1765,37 @@ public class PhoenixStatement implements Statement, SQLCloseable { return compileMutation(stmt, sql); } + public QueryLogger createQueryLogger(CompilableStatement stmt, String sql) throws SQLException { + boolean isSystemTable=false; + if(stmt instanceof ExecutableSelectStatement){ + TableNode from = ((ExecutableSelectStatement)stmt).getFrom(); + if(from instanceof NamedTableNode){ + String schemaName = ((NamedTableNode)from).getName().getSchemaName(); + if(schemaName==null){ + schemaName=connection.getSchema(); + } + if(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(schemaName)){ + isSystemTable=true; + } + } + } + QueryLogger queryLogger = QueryLogger.getInstance(connection,isSystemTable); + QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(), + connection.getQueryServices(), sql, queryLogger.getStartTime(), getParameters()); + return queryLogger; + } + @Override public ResultSet executeQuery(String sql) throws SQLException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Execute query: " + sql, connection)); } + CompilableStatement stmt = parseStatement(sql); if (stmt.getOperation().isMutation()) { throw new ExecuteQueryNotApplicableException(sql); } - return executeQuery(stmt); + return executeQuery(stmt,createQueryLogger(stmt,sql)); } @Override @@ -1795,7 +1831,8 @@ public class PhoenixStatement implements Statement, SQLCloseable { flushIfNecessary(); return false; } - executeQuery(stmt); + + executeQuery(stmt,createQueryLogger(stmt,sql)); return true; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java new file mode 100644 index 0000000..5792658 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +public enum LogLevel { + OFF, INFO, DEBUG, TRACE +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java new file mode 100644 index 0000000..817f9ec --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import java.io.IOException; +import java.sql.SQLException; + +/** + * Used by the event handler to write RingBufferEvent, this is done in a seperate thread from the application configured + * during disruptor + */ +public interface LogWriter { + /** + * Called by ring buffer event handler to write RingBufferEvent + * + * @param event + * @throws SQLException + * @throws IOException + */ + void write(RingBufferEvent event) throws SQLException, IOException; + + /** + * will be called when disruptor is getting shutdown + * + * @throws IOException + */ + + void close() throws IOException; + + /** + * if writer is closed and cannot write further event + * + * @return + */ + boolean isClosed(); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java new file mode 100644 index 0000000..ee6b2d6 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; + +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.SequenceReportingEventHandler; + + +public class QueryLogDetailsEventHandler implements SequenceReportingEventHandler<RingBufferEvent>, LifecycleAware { + private Sequence sequenceCallback; + private LogWriter logWriter; + + public QueryLogDetailsEventHandler(Configuration configuration) throws SQLException{ + this.logWriter = new TableLogWriter(configuration); + } + + @Override + public void setSequenceCallback(final Sequence sequenceCallback) { + this.sequenceCallback = sequenceCallback; + } + + @Override + public void onEvent(final RingBufferEvent event, final long sequence, final boolean endOfBatch) throws Exception { + logWriter.write(event); + event.clear(); + } + + @Override + public void onStart() { + } + + @Override + public void onShutdown() { + try { + if (logWriter != null) { + logWriter.close(); + } + } catch (Exception e) { + //Ignore + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java new file mode 100644 index 0000000..87de267 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; + +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.schema.types.PVarchar; + + +public enum QueryLogInfo { + + CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, PVarchar.INSTANCE), + QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), + BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, LogLevel.TRACE,PVarchar.INSTANCE), + QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), + TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), + START_TIME_I(START_TIME,QueryLogState.STARTED, LogLevel.INFO,PTimestamp.INSTANCE), + USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE), + EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE), + GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, LogLevel.DEBUG,PVarchar.INSTANCE), + NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE), + EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE), + QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE), + TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, LogLevel.DEBUG,PLong.INSTANCE), + SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, LogLevel.DEBUG,PVarchar.INSTANCE); + + public final String columnName; + public final QueryLogState logState; + public final LogLevel logLevel; + public final PDataType dataType; + + private QueryLogInfo(String columnName, QueryLogState logState, LogLevel logLevel, PDataType dataType) { + this.columnName = columnName; + this.logState=logState; + this.logLevel=logLevel; + this.dataType=dataType; + } + + public String getColumnName() { + return columnName; + } + + public QueryLogState getLogState() { + return logState; + } + + public LogLevel getLogLevel() { + return logLevel; + } + + public PDataType getDataType() { + return dataType; + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java new file mode 100644 index 0000000..e27f0e8 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +public enum QueryLogState { + STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java new file mode 100644 index 0000000..b2fb235 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.jdbc.PhoenixConnection; + +import com.google.common.collect.ImmutableMap; + +import io.netty.util.internal.ThreadLocalRandom; + +/* + * Wrapper for query translator + */ +public class QueryLogger { + private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator = new ThreadLocal<>(); + private QueryLoggerDisruptor queryDisruptor; + private String queryId; + private Long startTime; + private LogLevel logLevel; + private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class); + + private QueryLogger(PhoenixConnection connection) { + this.queryId = UUID.randomUUID().toString(); + this.queryDisruptor = connection.getQueryServices().getQueryDisruptor(); + this.startTime = System.currentTimeMillis(); + logLevel = connection.getLogLevel(); + } + + private QueryLogger() { + logLevel = LogLevel.OFF; + } + + private RingBufferEventTranslator getCachedTranslator() { + RingBufferEventTranslator result = threadLocalTranslator.get(); + if (result == null) { + result = new RingBufferEventTranslator(queryId); + threadLocalTranslator.set(result); + } + return result; + } + + private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() { + @Override + public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) { + + } + + @Override + public boolean isDebugEnabled(){ + return false; + } + + @Override + public boolean isInfoEnabled(){ + return false; + } + }; + + public static QueryLogger getInstance(PhoenixConnection connection, boolean isSystemTable) { + if (connection.getLogLevel() == LogLevel.OFF || isSystemTable || ThreadLocalRandom.current() + .nextDouble() > connection.getLogSamplingRate()) { return NO_OP_INSTANCE; } + return new QueryLogger(connection); + } + + /** + * Add query log in the table, columns will be logged depending upon the connection logLevel + * @param logState State of the query + * @param map Value of the map should be in format of the corresponding data type + */ + public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> map) { + final RingBufferEventTranslator translator = getCachedTranslator(); + translator.setQueryInfo(logState, map, logLevel); + publishLogs(translator); + } + + private boolean publishLogs(RingBufferEventTranslator translator) { + if (queryDisruptor == null) { return false; } + boolean isLogged = queryDisruptor.tryPublish(translator); + if (!isLogged && LOG.isDebugEnabled()) { + LOG.debug("Unable to write query log in table as ring buffer queue is full!!"); + } + return isLogged; + } + + /** + * Start time when the logger was started, if {@link LogLevel#OFF} then it's the current time + */ + public Long getStartTime() { + return startTime != null ? startTime : System.currentTimeMillis(); + } + + /** + * Is debug logging currently enabled? + * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than debug. + */ + public boolean isDebugEnabled(){ + return isLevelEnabled(LogLevel.DEBUG); + } + + private boolean isLevelEnabled(LogLevel logLevel){ + return this.logLevel != null ? logLevel.ordinal() <= this.logLevel.ordinal() : false; + } + + /** + * Is Info logging currently enabled? + * Call this method to prevent having to perform expensive operations (for example, String concatenation) when the log level is more than info. + * @return + */ + public boolean isInfoEnabled(){ + return isLevelEnabled(LogLevel.INFO); + } + + /** + * Return queryId of the current query logger , needed by the application + * to correlate with the logging table. + * Eg(usage):- + * StatementContext context = ((PhoenixResultSet)rs).getContext(); + * String queryId = context.getQueryLogger().getQueryId(); + * + * @return + */ + public String getQueryId() { + return this.queryId; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java new file mode 100644 index 0000000..e9ae6bd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import com.lmax.disruptor.ExceptionHandler; + +class QueryLoggerDefaultExceptionHandler implements ExceptionHandler<RingBufferEvent> { + + @Override + public void handleEventException(Throwable ex, long sequence, RingBufferEvent event) { + final StringBuilder sb = new StringBuilder(512); + sb.append("Query Logger error handling event seq=").append(sequence).append(", value='"); + try { + sb.append(event); + } catch (final Exception ignored) { + sb.append("[ERROR calling ").append(event.getClass()).append(".toString(): "); + sb.append(ignored).append("]"); + } + sb.append("':"); + System.err.println(sb); + ex.printStackTrace(); + } + + @Override + public void handleOnStartException(final Throwable throwable) { + System.err.println("QueryLogger error starting:"); + throwable.printStackTrace(); + } + + @Override + public void handleOnShutdownException(final Throwable throwable) { + System.err.println("QueryLogger error shutting down:"); + throwable.printStackTrace(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java new file mode 100644 index 0000000..b548d6c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.query.QueryServices; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventTranslator; +import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +public class QueryLoggerDisruptor implements Closeable{ + + private volatile Disruptor<RingBufferEvent> disruptor; + private boolean isClosed = false; + //number of elements to create within the ring buffer. + private static final int RING_BUFFER_SIZE = 256 * 1024; + private static final Log LOG = LogFactory.getLog(QueryLoggerDisruptor.class); + private static final String DEFAULT_WAIT_STRATEGY = BlockingWaitStrategy.class.getName(); + + public QueryLoggerDisruptor(Configuration configuration) throws SQLException{ + WaitStrategy waitStrategy; + try { + waitStrategy = (WaitStrategy)Class + .forName(configuration.get(QueryServices.LOG_BUFFER_WAIT_STRATEGY, DEFAULT_WAIT_STRATEGY)).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new SQLException(e); + } + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("QueryLogger" + "-thread-%s") + .setDaemon(true) + .setThreadFactory(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread result = Executors.defaultThreadFactory().newThread(r); + result.setContextClassLoader(QueryLoggerDisruptor.class.getClass().getClassLoader()); + return result; + } + }) + .build(); + disruptor = new Disruptor<RingBufferEvent>(RingBufferEvent.FACTORY, + configuration.getInt(QueryServices.LOG_BUFFER_SIZE, RING_BUFFER_SIZE), threadFactory, ProducerType.MULTI, + waitStrategy); + final ExceptionHandler<RingBufferEvent> errorHandler = new QueryLoggerDefaultExceptionHandler(); + disruptor.setDefaultExceptionHandler(errorHandler); + + final QueryLogDetailsEventHandler[] handlers = { new QueryLogDetailsEventHandler(configuration) }; + disruptor.handleEventsWith(handlers); + LOG.info("Starting QueryLoggerDisruptor for with ringbufferSize=" + disruptor.getRingBuffer().getBufferSize() + + ", waitStrategy=" + waitStrategy.getClass().getSimpleName() + ", " + "exceptionHandler=" + + errorHandler + "..."); + disruptor.start(); + + } + + /** + * Attempts to publish an event by translating (write) data representations into events claimed from the RingBuffer. + * @param translator + * @return + */ + public boolean tryPublish(final EventTranslator<RingBufferEvent> translator) { + if(isClosed()){ + return false; + } + return disruptor.getRingBuffer().tryPublishEvent(translator); + } + + + public boolean isClosed() { + return isClosed ; + } + + @Override + public void close() throws IOException { + isClosed = true; + LOG.info("Shutting down QueryLoggerDisruptor.."); + try { + //we can wait for 2 seconds, so that backlog can be committed + disruptor.shutdown(2, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new IOException(e); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java new file mode 100644 index 0000000..2f22931 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.schema.PName; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; + +public class QueryLoggerUtil { + + public static void logInitialDetails(QueryLogger queryLogger, PName tenantId, + ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) { + queryLogger.log(QueryLogState.STARTED, + getInitialDetails(tenantId, queryServices, query, startTime, bindParameters)); + + } + + private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName tenantId, + ConnectionQueryServices queryServices, String query, long startTime, List<Object> bindParameters) { + Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder(); + String clientIP; + try { + clientIP = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + clientIP = "UnknownHost"; + } + queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP); + queryLogBuilder.put(QueryLogInfo.QUERY_I, query); + queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime); + if (bindParameters != null) { + queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, StringUtils.join(bindParameters,",")); + } + if (tenantId != null) { + queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, tenantId.getString()); + } + queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() != null ? queryServices.getUserName() + : queryServices.getUser().getShortName()); + return queryLogBuilder.build(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java new file mode 100644 index 0000000..96e4bf9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import com.google.common.collect.ImmutableMap; +import com.lmax.disruptor.EventFactory; + + class RingBufferEvent { + private String queryId; + private QueryLogState logState; + private LogLevel connectionLogLevel; + private ImmutableMap<QueryLogInfo, Object> queryInfo; + + public static final Factory FACTORY = new Factory(); + + /** + * Creates the events that will be put in the RingBuffer. + */ + private static class Factory implements EventFactory<RingBufferEvent> { + @Override + public RingBufferEvent newInstance() { + final RingBufferEvent result = new RingBufferEvent(); + return result; + } + } + + public void clear() { + this.logState=null; + this.queryInfo=null; + this.queryId=null; + } + + + public String getQueryId() { + return queryId; + } + + public static Factory getFactory() { + return FACTORY; + } + + public QueryLogState getLogState() { + return logState; + } + + public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) { + this.queryInfo=queryInfo; + + } + + public void setQueryId(String queryId) { + this.queryId=queryId; + + } + + public ImmutableMap<QueryLogInfo, Object> getQueryInfo() { + return queryInfo; + + } + + public void setLogState(QueryLogState logState) { + this.logState=logState; + + } + + + public LogLevel getConnectionLogLevel() { + return connectionLogLevel; + } + + + public void setConnectionLogLevel(LogLevel connectionLogLevel) { + this.connectionLogLevel = connectionLogLevel; + } + + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b291068b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java new file mode 100644 index 0000000..653ddd6 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import com.google.common.collect.ImmutableMap; +import com.lmax.disruptor.EventTranslator; + +class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> { + private String queryId; + private QueryLogState logState; + private ImmutableMap<QueryLogInfo, Object> queryInfo; + private LogLevel connectionLogLevel; + + public RingBufferEventTranslator(String queryId) { + this.queryId=queryId; + } + + @Override + public void translateTo(RingBufferEvent event, long sequence) { + event.setQueryId(queryId); + event.setQueryInfo(queryInfo); + event.setLogState(logState); + event.setConnectionLogLevel(connectionLogLevel); + clear(); + } + + private void clear() { + setQueryInfo(null,null,null); + } + + public void setQueryInfo(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> queryInfo, + LogLevel connectionLogLevel) { + this.queryInfo = queryInfo; + this.logState = logState; + this.connectionLogLevel = connectionLogLevel; + } + +}