http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java new file mode 100644 index 0000000..c102855 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.Determinism; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.ImmutableMap; + +/** + * Writes RingBuffer log event into table + * + */ +public class TableLogWriter implements LogWriter { + private static final Log LOG = LogFactory.getLog(LogWriter.class); + private Connection connection; + private boolean isClosed; + private Table table; + private Configuration config; + + public TableLogWriter(Configuration configuration) { + this.config = configuration; + try { + this.connection = ConnectionFactory.createConnection(configuration); + table = this.connection.getTable(SchemaUtil.getPhysicalTableName( + SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config)); + } catch (Exception e) { + LOG.warn("Unable to initiate LogWriter for writing query logs to table"); + } + } + + @Override + public void write(RingBufferEvent event) throws SQLException, IOException { + if(isClosed()){ + LOG.warn("Unable to commit query log as Log committer is already closed"); + return; + } + if (table == null || connection == null) { + LOG.warn("Unable to commit query log as connection was not initiated "); + return; + } + ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + Put put =new Put(Bytes.toBytes(event.getQueryId())); + for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) { + if (entry.getKey().logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) { + LiteralExpression expression = LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType, + Determinism.ALWAYS); + expression.evaluate(null, ptr); + put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes(entry.getKey().columnName), + ByteUtil.copyKeyBytesIfNecessary(ptr)); + } + } + + if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal() + && (event.getLogState() == QueryLogState.COMPLETED || event.getLogState() == QueryLogState.FAILED)) { + LiteralExpression expression = LiteralExpression.newConstant(event.getLogState().toString(), + QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS); + expression.evaluate(null, ptr); + put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), ByteUtil.copyKeyBytesIfNecessary(ptr)); + } + put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + table.put(put); + + } + + @Override + public void close() throws IOException { + if(isClosed()){ + return; + } + isClosed=true; + try { + if (table != null) { + table.close(); + } + if (connection != null && !connection.isClosed()) { + //It should internally close all the statements + connection.close(); + } + } catch (IOException e) { + // TODO Ignore? + } + } + + public boolean isClosed(){ + return isClosed; + } + +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java index 4fd1194..c008635 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java @@ -1,27 +1,21 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.monitoring; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; @@ -44,6 +38,8 @@ public class ReadMetricQueue { private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>(); + private final List<ScanMetricsHolder> scanMetricsHolderList = new ArrayList<ScanMetricsHolder>(); + private final boolean isRequestMetricsEnabled; public ReadMetricQueue(boolean isRequestMetricsEnabled) { @@ -85,7 +81,7 @@ public class ReadMetricQueue { } return publishedMetrics; } - + public void clearMetrics() { metricsMap.clear(); // help gc } @@ -177,8 +173,18 @@ public class ReadMetricQueue { return q; } - public boolean isRequestMetricsEnabled() { - return isRequestMetricsEnabled; - } + public boolean isRequestMetricsEnabled() { + return isRequestMetricsEnabled; + } + + public void addScanHolder(ScanMetricsHolder holder){ + scanMetricsHolderList.add(holder); + } + + public List<ScanMetricsHolder> getScanMetricsHolderList() { + return scanMetricsHolderList; + } + + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java index 6bcd402..9125cd8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java @@ -17,20 +17,23 @@ */ package org.apache.phoenix.monitoring; -import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; -import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS; import static org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS; import static org.apache.phoenix.monitoring.MetricType.COUNT_NOT_SERVING_REGION_EXCEPTION; -import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS; -import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES; +import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED; +import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED; +import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS; -import org.apache.hadoop.hbase.client.Scan; +import java.io.IOException; +import java.util.Map; -import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; -import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES; -import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED; -import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.JsonMapper; public class ScanMetricsHolder { @@ -45,9 +48,11 @@ public class ScanMetricsHolder { private final CombinableMetric countOfRemoteRPCRetries; private final CombinableMetric countOfRowsScanned; private final CombinableMetric countOfRowsFiltered; + private Map<String, Long> scanMetricMap; + private Object scan; private static final ScanMetricsHolder NO_OP_INSTANCE = - new ScanMetricsHolder(new ReadMetricQueue(false), ""); + new ScanMetricsHolder(new ReadMetricQueue(false), "",null); public static ScanMetricsHolder getInstance(ReadMetricQueue readMetrics, String tableName, Scan scan, boolean isRequestMetricsEnabled) { @@ -55,10 +60,12 @@ public class ScanMetricsHolder { return NO_OP_INSTANCE; } scan.setScanMetricsEnabled(true); - return new ScanMetricsHolder(readMetrics, tableName); + return new ScanMetricsHolder(readMetrics, tableName, scan); } - private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName) { + private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName,Scan scan) { + readMetrics.addScanHolder(this); + this.scan=scan; countOfRPCcalls = readMetrics.allotMetric(COUNT_RPC_CALLS, tableName); countOfRemoteRPCcalls = readMetrics.allotMetric(COUNT_REMOTE_RPC_CALLS, tableName); sumOfMillisSecBetweenNexts = readMetrics.allotMetric(COUNT_MILLS_BETWEEN_NEXTS, tableName); @@ -118,4 +125,21 @@ public class ScanMetricsHolder { return countOfRowsScanned; } + public Map<String, Long> getScanMetricMap() { + return scanMetricMap; + } + + public void setScanMetricMap(Map<String, Long> scanMetricMap) { + this.scanMetricMap = scanMetricMap; + } + + @Override + public String toString() { + try { + return "{\"scan\":" + scan + ", \"scanMetrics\":" + JsonMapper.writeObjectAsString(scanMetricMap) + "}"; + } catch (IOException e) { + return "{\"Exception while converting scan metrics to Json\":\"" + e.getMessage() + "\"}"; + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 90f8089..0b72ada 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; @@ -146,4 +148,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated void upgradeSystemTables(String url, Properties props) throws SQLException; public Configuration getConfiguration(); + + public User getUser(); + + public QueryLoggerDisruptor getQueryDisruptor(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 6df2f80..8c7441a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -124,6 +124,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -183,6 +184,7 @@ import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.protobuf.ProtobufUtil; @@ -267,6 +269,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // don't need. private final ReadOnlyProps props; private final String userName; + private final User user; private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices; private final GuidePostsCache tableStatsCache; @@ -336,6 +339,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION; } }); + private QueryLoggerDisruptor queryDisruptor; private PMetaData newEmptyMetaData() { return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps())); @@ -372,6 +376,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement ConfigUtil.setReplicationConfigIfAbsent(this.config); this.props = new ReadOnlyProps(this.config.iterator()); this.userName = connectionInfo.getPrincipal(); + this.user = connectionInfo.getUser(); this.latestMetaData = newEmptyMetaData(); // TODO: should we track connection wide memory usage or just org-wide usage? // If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate @@ -396,6 +401,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS); this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0); + try { + this.queryDisruptor = new QueryLoggerDisruptor(this.config); + } catch (SQLException e) { + logger.warn("Unable to initiate qeuery logging service !!"); + e.printStackTrace(); + } } @@ -477,6 +488,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } closed = true; GLOBAL_QUERY_SERVICES_COUNTER.decrement(); + try { + if (this.queryDisruptor != null) { + this.queryDisruptor.close(); + } + } catch (Exception e) { + // Ignore + } SQLException sqlE = null; try { // Attempt to return any unused sequences. @@ -2611,7 +2629,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA); } catch (TableAlreadyExistsException ignore) {} - + try { + metaConnection.createStatement().execute(QueryConstants.CREATE_LOG_METADATA); + } catch (TableAlreadyExistsException ignore) {} // Catch the IOException to log the error message and then bubble it up for the client to retry. try { createSysMutexTableIfNotExists(hbaseAdmin, ConnectionQueryServicesImpl.this.getProps()); @@ -2966,6 +2986,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA); } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA); + } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} ConnectionQueryServicesImpl.this.upgradeRequired.set(false); success = true; } catch (UpgradeInProgressException | UpgradeNotRequiredException e) { @@ -4063,6 +4086,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public String getUserName() { return userName; } + + @Override + public User getUser() { + return user; + } private void checkClosed() { if (closed) { @@ -4488,4 +4516,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public Configuration getConfiguration() { return config; } + + @Override + public QueryLoggerDisruptor getQueryDisruptor() { + return this.queryDisruptor; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index c510b5a..ad354d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -55,6 +56,7 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.FunctionNotFoundException; @@ -112,10 +114,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap(); private final GuidePostsCache guidePostsCache; private final Configuration config; + + private User user; public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) { super(services); userName = connInfo.getPrincipal(); + user = connInfo.getUser(); metaData = newEmptyMetaData(); // Use KeyValueBuilder that builds real KeyValues, as our test utils require this @@ -328,6 +333,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA); } catch (NewerTableAlreadyExistsException ignore) { } + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA); + } catch (NewerTableAlreadyExistsException ignore) {} } catch (SQLException e) { sqlE = e; } finally { @@ -664,4 +672,14 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public Configuration getConfiguration() { return config; } + + @Override + public User getUser() { + return user; + } + + @Override + public QueryLoggerDisruptor getQueryDisruptor() { + return null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 05d1af6..f5c8a59 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.PColumn; @@ -351,4 +353,16 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public Configuration getConfiguration() { return getDelegate().getConfiguration(); } + + @Override + public User getUser() { + return getDelegate().getUser(); + } + + @Override + public QueryLoggerDisruptor getQueryDisruptor() { + return getDelegate().getQueryDisruptor(); + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 7607388..ae12e01 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -109,6 +109,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; import java.math.BigDecimal; @@ -124,6 +138,7 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableProperty; /** @@ -395,10 +410,40 @@ public interface QueryConstants { // Install split policy to prevent a tenant's metadata from being split across regions. HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; + + public static final String CREATE_LOG_METADATA = + "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(\n" + + // Pk columns + TENANT_ID + " VARCHAR ," + + QUERY_ID + " VARCHAR NOT NULL,\n" + + USER + " VARCHAR , \n" + + CLIENT_IP + " VARCHAR, \n" + + // Function metadata (will be null for argument row) + QUERY + " VARCHAR, \n" + + EXPLAIN_PLAN + " VARCHAR, \n" + + // Argument metadata (will be null for function row) + START_TIME + " TIMESTAMP, \n" + + TOTAL_EXECUTION_TIME + " BIGINT, \n" + + NO_OF_RESULTS_ITERATED + " BIGINT, \n" + + QUERY_STATUS + " VARCHAR, \n" + + EXCEPTION_TRACE + " VARCHAR, \n" + + GLOBAL_SCAN_DETAILS + " VARCHAR, \n" + + BIND_PARAMETERS + " VARCHAR, \n" + + SCAN_METRICS_JSON + " VARCHAR, \n" + + " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (QUERY_ID))\n" + + HConstants.VERSIONS + "= " + MetaDataProtocol.DEFAULT_LOG_VERSIONS + ",\n" + + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+ + // Install split policy to prevent a tenant's metadata from being split across regions. + HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE+ ",\n" + + HColumnDescriptor.TTL + "=" + MetaDataProtocol.DEFAULT_LOG_TTL+",\n"+ + TableProperty.COLUMN_ENCODED_BYTES.toString()+" = 0"; + public static final byte[] OFFSET_FAMILY = "f_offset".getBytes(); public static final byte[] OFFSET_COLUMN = "c_offset".getBytes(); public static final String LAST_SCAN = "LAST_SCAN"; public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); public static final String HASH_JOIN_CACHE_RETRIES = "hashjoin.client.retries.number"; public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5; + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 0b18aaa..43b9e5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -301,6 +301,10 @@ public interface QueryServices extends SQLCloseable { // Whether to enable cost-based-decision in the query optimizer public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled"; public static final String SMALL_SCAN_THRESHOLD_ATTRIB = "phoenix.query.smallScanThreshold"; + public static final String LOG_LEVEL = "phoenix.log.level"; + public static final String LOG_BUFFER_SIZE = "phoenix.log.buffer.size"; + public static final String LOG_BUFFER_WAIT_STRATEGY = "phoenix.log.wait.strategy"; + public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate"; /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 961ab9f..58c9812 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -47,6 +47,8 @@ import static org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLE import static org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE; import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB; +import static org.apache.phoenix.query.QueryServices.LOG_LEVEL; +import static org.apache.phoenix.query.QueryServices.LOG_SAMPLE_RATE; import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB; @@ -107,6 +109,7 @@ import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableRefFactory; @@ -347,6 +350,8 @@ public class QueryServicesOptions { public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false; public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false; + public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name(); + public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0"; private final Configuration config; @@ -428,7 +433,9 @@ public class QueryServicesOptions { .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION) .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED) .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING) - .setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED); + .setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED) + .setIfUnset(LOG_LEVEL, DEFAULT_LOGGING_LEVEL) + .setIfUnset(LOG_SAMPLE_RATE, DEFAULT_LOG_SAMPLE_RATE); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set // it to 1, so we'll change it. http://git-wip-us.apache.org/repos/asf/phoenix/blob/0bc03684/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c69b6b1..87c2db6 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ <jackson.version>1.9.2</jackson.version> <antlr.version>3.5.2</antlr.version> <log4j.version>1.2.17</log4j.version> + <disruptor.version>3.3.6</disruptor.version> <slf4j.version>1.6.4</slf4j.version> <protobuf-java.version>2.5.0</protobuf-java.version> <commons-io.version>2.1</commons-io.version> @@ -952,6 +953,11 @@ <artifactId>javax.servlet-api</artifactId> <version>${servlet.api.version}</version> </dependency> + <dependency> + <groupId>com.lmax</groupId> + <artifactId>disruptor</artifactId> + <version>${disruptor.version}</version> + </dependency> </dependencies> </dependencyManagement>