Repository: phoenix Updated Branches: refs/heads/5.x-HBase-2.0 880cda16f -> 8d82a0ec7
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java new file mode 100644 index 0000000..c102855 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.Determinism; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.ImmutableMap; + +/** + * Writes RingBuffer log event into table + * + */ +public class TableLogWriter implements LogWriter { + private static final Log LOG = LogFactory.getLog(LogWriter.class); + private Connection connection; + private boolean isClosed; + private Table table; + private Configuration config; + + public TableLogWriter(Configuration configuration) { + this.config = configuration; + try { + this.connection = ConnectionFactory.createConnection(configuration); + table = this.connection.getTable(SchemaUtil.getPhysicalTableName( + SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config)); + } catch (Exception e) { + LOG.warn("Unable to initiate LogWriter for writing query logs to table"); + } + } + + @Override + public void write(RingBufferEvent event) throws SQLException, IOException { + if(isClosed()){ + LOG.warn("Unable to commit query log as Log committer is already closed"); + return; + } + if (table == null || connection == null) { + LOG.warn("Unable to commit query log as connection was not initiated "); + return; + } + ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + Put put =new Put(Bytes.toBytes(event.getQueryId())); + for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) { + if (entry.getKey().logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) { + LiteralExpression expression = LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType, + Determinism.ALWAYS); + expression.evaluate(null, ptr); + put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes(entry.getKey().columnName), + ByteUtil.copyKeyBytesIfNecessary(ptr)); + } + } + + if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal() + && (event.getLogState() == QueryLogState.COMPLETED || event.getLogState() == QueryLogState.FAILED)) { + LiteralExpression expression = LiteralExpression.newConstant(event.getLogState().toString(), + QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS); + expression.evaluate(null, ptr); + put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), ByteUtil.copyKeyBytesIfNecessary(ptr)); + } + put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + table.put(put); + + } + + @Override + public void close() throws IOException { + if(isClosed()){ + return; + } + isClosed=true; + try { + if (table != null) { + table.close(); + } + if (connection != null && !connection.isClosed()) { + //It should internally close all the statements + connection.close(); + } + } catch (IOException e) { + // TODO Ignore? + } + } + + public boolean isClosed(){ + return isClosed; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java index 4fd1194..c008635 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java @@ -1,27 +1,21 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.monitoring; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; @@ -44,6 +38,8 @@ public class ReadMetricQueue { private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap = new ConcurrentHashMap<>(); + private final List<ScanMetricsHolder> scanMetricsHolderList = new ArrayList<ScanMetricsHolder>(); + private final boolean isRequestMetricsEnabled; public ReadMetricQueue(boolean isRequestMetricsEnabled) { @@ -85,7 +81,7 @@ public class ReadMetricQueue { } return publishedMetrics; } - + public void clearMetrics() { metricsMap.clear(); // help gc } @@ -177,8 +173,18 @@ public class ReadMetricQueue { return q; } - public boolean isRequestMetricsEnabled() { - return isRequestMetricsEnabled; - } + public boolean isRequestMetricsEnabled() { + return isRequestMetricsEnabled; + } + + public void addScanHolder(ScanMetricsHolder holder){ + scanMetricsHolderList.add(holder); + } + + public List<ScanMetricsHolder> getScanMetricsHolderList() { + return scanMetricsHolderList; + } + + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java index 6bcd402..9125cd8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java @@ -17,20 +17,23 @@ */ package org.apache.phoenix.monitoring; -import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; -import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS; import static org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS; import static org.apache.phoenix.monitoring.MetricType.COUNT_NOT_SERVING_REGION_EXCEPTION; -import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS; -import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES; +import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED; +import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED; +import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; +import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS; -import org.apache.hadoop.hbase.client.Scan; +import java.io.IOException; +import java.util.Map; -import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; -import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES; -import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED; -import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.JsonMapper; public class ScanMetricsHolder { @@ -45,9 +48,11 @@ public class ScanMetricsHolder { private final CombinableMetric countOfRemoteRPCRetries; private final CombinableMetric countOfRowsScanned; private final CombinableMetric countOfRowsFiltered; + private Map<String, Long> scanMetricMap; + private Object scan; private static final ScanMetricsHolder NO_OP_INSTANCE = - new ScanMetricsHolder(new ReadMetricQueue(false), ""); + new ScanMetricsHolder(new ReadMetricQueue(false), "",null); public static ScanMetricsHolder getInstance(ReadMetricQueue readMetrics, String tableName, Scan scan, boolean isRequestMetricsEnabled) { @@ -55,10 +60,12 @@ public class ScanMetricsHolder { return NO_OP_INSTANCE; } scan.setScanMetricsEnabled(true); - return new ScanMetricsHolder(readMetrics, tableName); + return new ScanMetricsHolder(readMetrics, tableName, scan); } - private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName) { + private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName,Scan scan) { + readMetrics.addScanHolder(this); + this.scan=scan; countOfRPCcalls = readMetrics.allotMetric(COUNT_RPC_CALLS, tableName); countOfRemoteRPCcalls = readMetrics.allotMetric(COUNT_REMOTE_RPC_CALLS, tableName); sumOfMillisSecBetweenNexts = readMetrics.allotMetric(COUNT_MILLS_BETWEEN_NEXTS, tableName); @@ -118,4 +125,21 @@ public class ScanMetricsHolder { return countOfRowsScanned; } + public Map<String, Long> getScanMetricMap() { + return scanMetricMap; + } + + public void setScanMetricMap(Map<String, Long> scanMetricMap) { + this.scanMetricMap = scanMetricMap; + } + + @Override + public String toString() { + try { + return "{\"scan\":" + scan + ", \"scanMetrics\":" + JsonMapper.writeObjectAsString(scanMetricMap) + "}"; + } catch (IOException e) { + return "{\"Exception while converting scan metrics to Json\":\"" + e.getMessage() + "\"}"; + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 4a5c04e..f068770 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -31,12 +31,14 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; @@ -147,4 +149,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated void upgradeSystemTables(String url, Properties props) throws SQLException; public Configuration getConfiguration(); + + public User getUser(); + + public QueryLoggerDisruptor getQueryDisruptor(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 128e1aa..4b04e74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -130,6 +130,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -189,6 +190,7 @@ import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.protobuf.ProtobufUtil; @@ -273,6 +275,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // don't need. private final ReadOnlyProps props; private final String userName; + private final User user; private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices; private final GuidePostsCache tableStatsCache; @@ -342,6 +345,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION; } }); + private QueryLoggerDisruptor queryDisruptor; private PMetaData newEmptyMetaData() { return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps())); @@ -378,6 +382,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement ConfigUtil.setReplicationConfigIfAbsent(this.config); this.props = new ReadOnlyProps(this.config.iterator()); this.userName = connectionInfo.getPrincipal(); + this.user = connectionInfo.getUser(); this.latestMetaData = newEmptyMetaData(); // TODO: should we track connection wide memory usage or just org-wide usage? // If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate @@ -402,6 +407,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS); this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0); + try { + this.queryDisruptor = new QueryLoggerDisruptor(this.config); + } catch (SQLException e) { + logger.warn("Unable to initiate qeuery logging service !!"); + e.printStackTrace(); + } } @@ -483,6 +494,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } closed = true; GLOBAL_QUERY_SERVICES_COUNTER.decrement(); + try { + if (this.queryDisruptor != null) { + this.queryDisruptor.close(); + } + } catch (Exception e) { + // Ignore + } SQLException sqlE = null; try { // Attempt to return any unused sequences. @@ -2641,7 +2659,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA); } catch (TableAlreadyExistsException ignore) {} - + try { + metaConnection.createStatement().execute(QueryConstants.CREATE_LOG_METADATA); + } catch (TableAlreadyExistsException ignore) {} // Catch the IOException to log the error message and then bubble it up for the client to retry. try { createSysMutexTableIfNotExists(hbaseAdmin, ConnectionQueryServicesImpl.this.getProps()); @@ -2998,6 +3018,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA); } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA); + } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} ConnectionQueryServicesImpl.this.upgradeRequired.set(false); success = true; } catch (UpgradeInProgressException | UpgradeNotRequiredException e) { @@ -4091,6 +4114,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public String getUserName() { return userName; } + + @Override + public User getUser() { + return user; + } private void checkClosed() { if (closed) { @@ -4516,4 +4544,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public Configuration getConfiguration() { return config; } + + @Override + public QueryLoggerDisruptor getQueryDisruptor() { + return this.queryDisruptor; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 3984048..7694fd0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -57,6 +58,7 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.FunctionNotFoundException; @@ -114,10 +116,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap(); private final GuidePostsCache guidePostsCache; private final Configuration config; + + private User user; public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) { super(services); userName = connInfo.getPrincipal(); + user = connInfo.getUser(); metaData = newEmptyMetaData(); // Use KeyValueBuilder that builds real KeyValues, as our test utils require this @@ -332,6 +337,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA); } catch (NewerTableAlreadyExistsException ignore) { } + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA); + } catch (NewerTableAlreadyExistsException ignore) {} } catch (SQLException e) { sqlE = e; } finally { @@ -668,4 +676,14 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public Configuration getConfiguration() { return config; } + + @Override + public User getUser() { + return user; + } + + @Override + public QueryLoggerDisruptor getQueryDisruptor() { + return null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 52b0c8d..2ab73b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -31,12 +31,14 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.log.QueryLoggerDisruptor; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.PColumn; @@ -352,4 +354,16 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public Configuration getConfiguration() { return getDelegate().getConfiguration(); } + + @Override + public User getUser() { + return getDelegate().getUser(); + } + + @Override + public QueryLoggerDisruptor getQueryDisruptor() { + return getDelegate().getQueryDisruptor(); + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 3a29821..c30d3d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -18,97 +18,7 @@ package org.apache.phoenix.query; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POST_KEY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_TABLE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SELF_REFERENCING_COL_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SEQUENCE_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.*; import java.math.BigDecimal; @@ -124,6 +34,7 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableProperty; /** @@ -396,10 +307,40 @@ public interface QueryConstants { // Install split policy to prevent a tenant's metadata from being split across regions. TableDescriptorBuilder.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; + + public static final String CREATE_LOG_METADATA = + "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE + "\"(\n" + + // Pk columns + TENANT_ID + " VARCHAR ," + + QUERY_ID + " VARCHAR NOT NULL,\n" + + USER + " VARCHAR , \n" + + CLIENT_IP + " VARCHAR, \n" + + // Function metadata (will be null for argument row) + QUERY + " VARCHAR, \n" + + EXPLAIN_PLAN + " VARCHAR, \n" + + // Argument metadata (will be null for function row) + START_TIME + " TIMESTAMP, \n" + + TOTAL_EXECUTION_TIME + " BIGINT, \n" + + NO_OF_RESULTS_ITERATED + " BIGINT, \n" + + QUERY_STATUS + " VARCHAR, \n" + + EXCEPTION_TRACE + " VARCHAR, \n" + + GLOBAL_SCAN_DETAILS + " VARCHAR, \n" + + BIND_PARAMETERS + " VARCHAR, \n" + + SCAN_METRICS_JSON + " VARCHAR, \n" + + " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (QUERY_ID))\n" + + HConstants.VERSIONS + "= " + MetaDataProtocol.DEFAULT_LOG_VERSIONS + ",\n" + + ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+ + // Install split policy to prevent a tenant's metadata from being split across regions. + TableDescriptorBuilder.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE+ ",\n" + + ColumnFamilyDescriptorBuilder.TTL + "=" + MetaDataProtocol.DEFAULT_LOG_TTL+",\n"+ + TableProperty.COLUMN_ENCODED_BYTES.toString()+" = 0"; + public static final byte[] OFFSET_FAMILY = "f_offset".getBytes(); public static final byte[] OFFSET_COLUMN = "c_offset".getBytes(); public static final String LAST_SCAN = "LAST_SCAN"; public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); public static final String HASH_JOIN_CACHE_RETRIES = "hashjoin.client.retries.number"; public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5; + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 6917c8f..f807182 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -299,6 +299,10 @@ public interface QueryServices extends SQLCloseable { // Whether to enable cost-based-decision in the query optimizer public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled"; public static final String SMALL_SCAN_THRESHOLD_ATTRIB = "phoenix.query.smallScanThreshold"; + public static final String LOG_LEVEL = "phoenix.log.level"; + public static final String LOG_BUFFER_SIZE = "phoenix.log.buffer.size"; + public static final String LOG_BUFFER_WAIT_STRATEGY = "phoenix.log.wait.strategy"; + public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate"; /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 8caa891..427796f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -47,6 +47,8 @@ import static org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLE import static org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE; import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB; +import static org.apache.phoenix.query.QueryServices.LOG_LEVEL; +import static org.apache.phoenix.query.QueryServices.LOG_SAMPLE_RATE; import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB; @@ -107,6 +109,7 @@ import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableRefFactory; @@ -346,6 +349,8 @@ public class QueryServicesOptions { public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false; public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false; + public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name(); + public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0"; private final Configuration config; @@ -427,7 +432,10 @@ public class QueryServicesOptions { .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION) .setIfUnset(USE_STATS_FOR_PARALLELIZATION, DEFAULT_USE_STATS_FOR_PARALLELIZATION) .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING) - .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED); + .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, DEFAULT_COST_BASED_OPTIMIZER_ENABLED) + .setIfUnset(PHOENIX_ACLS_ENABLED, DEFAULT_PHOENIX_ACLS_ENABLED) + .setIfUnset(LOG_LEVEL, DEFAULT_LOGGING_LEVEL) + .setIfUnset(LOG_SAMPLE_RATE, DEFAULT_LOG_SAMPLE_RATE); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set // it to 1, so we'll change it. http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d82a0ec/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1ab0080..ad061e9 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,7 @@ <jackson.version>1.9.2</jackson.version> <antlr.version>3.5.2</antlr.version> <log4j.version>1.2.17</log4j.version> + <disruptor.version>3.3.6</disruptor.version> <slf4j.version>1.6.4</slf4j.version> <protobuf-java.version>2.5.0</protobuf-java.version> <commons-io.version>2.1</commons-io.version> @@ -996,6 +997,11 @@ <version>${bouncycastle.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.lmax</groupId> + <artifactId>disruptor</artifactId> + <version>${disruptor.version}</version> + </dependency> </dependencies> </dependencyManagement>