PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0f6595c0 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0f6595c0 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0f6595c0 Branch: refs/heads/calcite Commit: 0f6595c0c511a3f07c51cf92d1ced665556b7d4c Parents: 9c069bd Author: Samarth <samarth.j...@salesforce.com> Authored: Fri Jun 26 16:44:43 2015 -0700 Committer: Samarth <samarth.j...@salesforce.com> Committed: Fri Jun 26 16:44:43 2015 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/PhoenixMetricsIT.java | 147 ---- .../apache/phoenix/execute/PartialCommitIT.java | 1 + .../phoenix/monitoring/PhoenixMetricsIT.java | 815 +++++++++++++++++++ .../apache/phoenix/cache/ServerCacheClient.java | 7 + .../apache/phoenix/compile/DeleteCompiler.java | 50 +- .../MutatingParallelIteratorFactory.java | 51 +- .../phoenix/compile/StatementContext.java | 49 +- .../apache/phoenix/compile/UpsertCompiler.java | 80 +- .../apache/phoenix/execute/AggregatePlan.java | 8 +- .../apache/phoenix/execute/HashJoinPlan.java | 7 + .../apache/phoenix/execute/MutationState.java | 290 ++++--- .../org/apache/phoenix/execute/UnionPlan.java | 8 +- .../phoenix/iterate/BaseResultIterators.java | 15 +- .../phoenix/iterate/ChunkedResultIterator.java | 21 +- .../iterate/ParallelIteratorFactory.java | 4 +- .../phoenix/iterate/ParallelIterators.java | 25 +- .../iterate/RoundRobinResultIterator.java | 4 +- .../phoenix/iterate/ScanningResultIterator.java | 38 +- .../apache/phoenix/iterate/SerialIterators.java | 23 +- .../phoenix/iterate/SpoolingResultIterator.java | 49 +- .../phoenix/iterate/TableResultIterator.java | 17 +- .../phoenix/iterate/UnionResultIterators.java | 70 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 27 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 21 +- .../apache/phoenix/jdbc/PhoenixResultSet.java | 48 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 20 +- .../java/org/apache/phoenix/job/JobManager.java | 60 +- .../phoenix/mapreduce/CsvBulkLoadTool.java | 10 +- .../phoenix/mapreduce/PhoenixRecordReader.java | 12 +- .../phoenix/memory/GlobalMemoryManager.java | 5 - .../apache/phoenix/monitoring/AtomicMetric.java | 70 ++ .../phoenix/monitoring/CombinableMetric.java | 77 ++ .../monitoring/CombinableMetricImpl.java | 77 ++ .../org/apache/phoenix/monitoring/Counter.java | 85 -- .../phoenix/monitoring/GlobalClientMetrics.java | 117 +++ .../apache/phoenix/monitoring/GlobalMetric.java | 37 + .../phoenix/monitoring/GlobalMetricImpl.java | 74 ++ .../phoenix/monitoring/MemoryMetricsHolder.java | 43 + .../org/apache/phoenix/monitoring/Metric.java | 45 +- .../apache/phoenix/monitoring/MetricType.java | 55 ++ .../phoenix/monitoring/MetricsStopWatch.java | 59 ++ .../phoenix/monitoring/MutationMetricQueue.java | 131 +++ .../phoenix/monitoring/NonAtomicMetric.java | 71 ++ .../phoenix/monitoring/OverAllQueryMetrics.java | 121 +++ .../phoenix/monitoring/PhoenixMetrics.java | 118 --- .../phoenix/monitoring/ReadMetricQueue.java | 180 ++++ .../phoenix/monitoring/SizeStatistic.java | 78 -- .../monitoring/SpoolingMetricsHolder.java | 43 + .../monitoring/TaskExecutionMetricsHolder.java | 68 ++ .../phoenix/query/BaseQueryServicesImpl.java | 2 +- .../org/apache/phoenix/query/QueryServices.java | 3 +- .../phoenix/query/QueryServicesOptions.java | 25 +- .../phoenix/trace/PhoenixMetricsSink.java | 36 +- .../java/org/apache/phoenix/util/JDBCUtil.java | 6 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 175 +++- .../iterate/SpoolingResultIteratorTest.java | 4 +- 56 files changed, 2930 insertions(+), 852 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java deleted file mode 100644 index edb4042..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixMetricsIT.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; - -import org.apache.phoenix.monitoring.Metric; -import org.apache.phoenix.util.PhoenixRuntime; -import org.junit.Test; - -public class PhoenixMetricsIT extends BaseHBaseManagedTimeIT { - - @Test - public void testResetPhoenixMetrics() { - resetMetrics(); - for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) { - assertEquals(0, m.getTotalSum()); - assertEquals(0, m.getNumberOfSamples()); - } - } - - @Test - public void testPhoenixMetricsForQueries() throws Exception { - createTableAndInsertValues("T", true); - resetMetrics(); // we want to count metrics related only to the below query - Connection conn = DriverManager.getConnection(getUrl()); - String query = "SELECT * FROM T"; - ResultSet rs = conn.createStatement().executeQuery(query); - while (rs.next()) { - rs.getString(1); - rs.getString(2); - } - assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum()); - assertEquals(1, QUERY_COUNT.getMetric().getTotalSum()); - assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum()); - assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum()); - assertEquals(0, FAILED_QUERY.getMetric().getTotalSum()); - assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum()); - assertEquals(0, MUTATION_BATCH_SIZE.getMetric().getTotalSum()); - assertEquals(0, MUTATION_BYTES.getMetric().getTotalSum()); - assertEquals(0, MUTATION_COMMIT_TIME.getMetric().getTotalSum()); - - assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0); - assertTrue(QUERY_TIME.getMetric().getTotalSum() > 0); - } - - @Test - public void testPhoenixMetricsForMutations() throws Exception { - createTableAndInsertValues("T", true); - assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum()); - assertEquals(10, MUTATION_COUNT.getMetric().getTotalSum()); - assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0); - assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0); - assertEquals(0, PARALLEL_SCANS.getMetric().getTotalSum()); - assertEquals(0, QUERY_COUNT.getMetric().getTotalSum()); - assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum()); - assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum()); - assertEquals(0, FAILED_QUERY.getMetric().getTotalSum()); - assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum()); - } - - - @Test - public void testPhoenixMetricsForUpsertSelect() throws Exception { - createTableAndInsertValues("T", true); - resetMetrics(); - String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute(ddl); - resetMetrics(); - String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T"; - conn.createStatement().executeUpdate(dml); - conn.commit(); - assertEquals(10, MUTATION_BATCH_SIZE.getMetric().getTotalSum()); - assertEquals(1, MUTATION_COUNT.getMetric().getTotalSum()); - assertEquals(1, PARALLEL_SCANS.getMetric().getTotalSum()); - assertEquals(0, QUERY_TIME.getMetric().getTotalSum()); - assertTrue(SCAN_BYTES.getMetric().getTotalSum() > 0); - assertTrue(MUTATION_BYTES.getMetric().getTotalSum() > 0); - assertTrue(MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0); - assertEquals(0, QUERY_COUNT.getMetric().getTotalSum()); - assertEquals(0, REJECTED_TASK_COUNT.getMetric().getTotalSum()); - assertEquals(0, QUERY_TIMEOUT.getMetric().getTotalSum()); - assertEquals(0, FAILED_QUERY.getMetric().getTotalSum()); - assertEquals(0, NUM_SPOOL_FILE.getMetric().getTotalSum()); - } - - private static void resetMetrics() { - for (Metric m : PhoenixRuntime.getInternalPhoenixMetrics()) { - m.reset(); - } - } - - private static void createTableAndInsertValues(String tableName, boolean resetMetricsAfterTableCreate) throws Exception { - String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute(ddl); - if (resetMetricsAfterTableCreate) { - resetMetrics(); - } - // executing 10 upserts/mutations. - String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; - PreparedStatement stmt = conn.prepareStatement(dml); - for (int i = 1; i <= 10; i++) { - stmt.setString(1, "key" + i); - stmt.setString(2, "value" + i); - stmt.executeUpdate(); - } - conn.commit(); - } - - - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index c8696e2..e0f0a3c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -260,6 +260,7 @@ public class PartialCommitIT { PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); return new PhoenixConnection(phxCon) { + @Override protected MutationState newMutationState(int maxSize) { return new MutationState(maxSize, this, mutations); }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java new file mode 100644 index 0000000..d9ca8e8 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -0,0 +1,815 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME; +import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; +import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; +import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; +import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class PhoenixMetricsIT extends BaseOwnClusterHBaseManagedTimeIT { + + private static final List<String> mutationMetricsToSkip = Lists + .newArrayList(MetricType.MUTATION_COMMIT_TIME.name()); + private static final List<String> readMetricsToSkip = Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME.name(), + MetricType.TASK_EXECUTION_TIME.name(), MetricType.TASK_END_TO_END_TIME.name()); + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Enable request metric collection at the driver level + props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testResetGlobalPhoenixMetrics() { + resetGlobalMetrics(); + for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) { + assertEquals(0, m.getTotalSum()); + assertEquals(0, m.getNumberOfSamples()); + } + } + + @Test + public void testGlobalPhoenixMetricsForQueries() throws Exception { + createTableAndInsertValues("T", true); + resetGlobalMetrics(); // we want to count metrics related only to the below query + Connection conn = DriverManager.getConnection(getUrl()); + String query = "SELECT * FROM T"; + ResultSet rs = conn.createStatement().executeQuery(query); + while (rs.next()) { + rs.getString(1); + rs.getString(2); + } + assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum()); + assertEquals(1, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_MUTATION_BYTES.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum()); + + assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0); + assertTrue(GLOBAL_QUERY_TIME.getMetric().getTotalSum() > 0); + assertTrue(GLOBAL_TASK_END_TO_END_TIME.getMetric().getTotalSum() > 0); + assertTrue(GLOBAL_TASK_EXECUTION_TIME.getMetric().getTotalSum() > 0); + } + + @Test + public void testGlobalPhoenixMetricsForMutations() throws Exception { + createTableAndInsertValues("T", true); + assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum()); + assertEquals(10, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum()); + assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0); + assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0); + assertEquals(0, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum()); + } + + @Test + public void testGlobalPhoenixMetricsForUpsertSelect() throws Exception { + createTableAndInsertValues("T", true); + resetGlobalMetrics(); + String ddl = "CREATE TABLE T2 (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(ddl); + resetGlobalMetrics(); + String dml = "UPSERT INTO T2 (K, V) SELECT K, V FROM T"; + conn.createStatement().executeUpdate(dml); + conn.commit(); + assertEquals(10, GLOBAL_MUTATION_BATCH_SIZE.getMetric().getTotalSum()); + assertEquals(1, GLOBAL_MUTATION_SQL_COUNTER.getMetric().getTotalSum()); + assertEquals(1, GLOBAL_NUM_PARALLEL_SCANS.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_QUERY_TIME.getMetric().getTotalSum()); + assertTrue(GLOBAL_SCAN_BYTES.getMetric().getTotalSum() > 0); + assertTrue(GLOBAL_MUTATION_BYTES.getMetric().getTotalSum() > 0); + assertTrue(GLOBAL_MUTATION_COMMIT_TIME.getMetric().getTotalSum() > 0); + assertEquals(0, GLOBAL_SELECT_SQL_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_REJECTED_TASK_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_QUERY_TIMEOUT_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum()); + assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum()); + } + + private static void resetGlobalMetrics() { + for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) { + m.reset(); + } + } + + private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate) + throws Exception { + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(ddl); + if (resetGlobalMetricsAfterTableCreate) { + resetGlobalMetrics(); + } + // executing 10 upserts/mutations. + String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + for (int i = 1; i <= 10; i++) { + stmt.setString(1, "key" + i); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + conn.commit(); + } + + @Test + public void testOverallQueryMetricsForSelect() throws Exception { + String tableName = "SCANMETRICS"; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6"; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(ddl); + } + + @Test + public void testReadMetricsForSelect() throws Exception { + String tableName = "READMETRICSFORSELECT"; + long numSaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + numSaltBuckets; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(ddl); + + long numRows = 1000; + long numExpectedTasks = numSaltBuckets; + insertRowsInTable(tableName, numRows); + + String query = "SELECT * FROM " + tableName; + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + PhoenixResultSet resultSetBeingTested = rs.unwrap(PhoenixResultSet.class); + changeInternalStateForTesting(resultSetBeingTested); + while (resultSetBeingTested.next()) {} + resultSetBeingTested.close(); + Set<String> expectedTableNames = Sets.newHashSet(tableName); + assertReadMetricValuesForSelectSql(Lists.newArrayList(numRows), Lists.newArrayList(numExpectedTasks), + resultSetBeingTested, expectedTableNames); + } + + @Test + public void testMetricsForUpsert() throws Exception { + String tableName = "UPSERTMETRICS"; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 6"; + Connection ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + + int numRows = 10; + Connection conn = insertRowsInTable(tableName, numRows); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) { + String t = entry.getKey(); + assertEquals("Table names didn't match!", tableName, t); + Map<String, Long> p = entry.getValue(); + assertEquals("There should have been three metrics", 3, p.size()); + boolean mutationBatchSizePresent = false; + boolean mutationCommitTimePresent = false; + boolean mutationBytesPresent = false; + for (Entry<String, Long> metric : p.entrySet()) { + String metricName = metric.getKey(); + long metricValue = metric.getValue(); + if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) { + assertEquals("Mutation batch sizes didn't match!", numRows, metricValue); + mutationBatchSizePresent = true; + } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) { + assertTrue("Mutation commit time should be greater than zero", metricValue > 0); + mutationCommitTimePresent = true; + } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) { + assertTrue("Mutation bytes size should be greater than zero", metricValue > 0); + mutationBytesPresent = true; + } + } + assertTrue(mutationBatchSizePresent); + assertTrue(mutationCommitTimePresent); + assertTrue(mutationBytesPresent); + } + Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + assertEquals("Read metrics should be empty", 0, readMetrics.size()); + } + + @Test + public void testMetricsForUpsertSelect() throws Exception { + String tableName1 = "UPSERTFROM"; + long table1SaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + table1SaltBuckets; + Connection ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + int numRows = 10; + insertRowsInTable(tableName1, numRows); + + String tableName2 = "UPSERTTO"; + ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10"; + ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + + Connection conn = DriverManager.getConnection(getUrl()); + String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1; + conn.createStatement().executeUpdate(upsertSelect); + conn.commit(); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + + Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + assertMutationMetrics(tableName2, numRows, mutationMetrics); + Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + assertReadMetricsForMutatingSql(tableName1, table1SaltBuckets, readMetrics); + } + + @Test + public void testMetricsForDelete() throws Exception { + String tableName = "DELETEMETRICS"; + long tableSaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + tableSaltBuckets; + Connection ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + int numRows = 10; + insertRowsInTable(tableName, numRows); + Connection conn = DriverManager.getConnection(getUrl()); + String delete = "DELETE FROM " + tableName; + conn.createStatement().execute(delete); + conn.commit(); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + assertMutationMetrics(tableName, numRows, mutationMetrics); + + Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + assertReadMetricsForMutatingSql(tableName, tableSaltBuckets, readMetrics); + } + + @Test + public void testNoMetricsCollectedForConnection() throws Exception { + String tableName = "NOMETRICS"; + long tableSaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + tableSaltBuckets; + Connection ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + int numRows = 10; + insertRowsInTable(tableName, numRows); + Properties props = new Properties(); + props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "false"); + Connection conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); + while (rs.next()) {} + rs.close(); + Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getRequestReadMetrics(rs); + assertTrue("No read metrics should have been generated", readMetrics.size() == 0); + conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " VALUES ('KEY', 'VALUE')"); + conn.commit(); + Map<String, Map<String, Long>> writeMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn); + assertTrue("No write metrics should have been generated", writeMetrics.size() == 0); + } + + @Test + public void testMetricsForUpsertWithAutoCommit() throws Exception { + String tableName = "VERIFYUPSERTAUTOCOMMIT"; + long tableSaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + tableSaltBuckets; + try (Connection ddlConn = DriverManager.getConnection(getUrl())) { + ddlConn.createStatement().execute(ddl); + } + + String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + int numRows = 10; + Map<String, Map<String, Long>> mutationMetricsForAutoCommitOff = null; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + upsertRows(upsert, numRows, conn); + conn.commit(); + mutationMetricsForAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn); + } + + // Insert rows now with auto-commit on + Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(true); + upsertRows(upsert, numRows, conn); + mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn); + } + // Verify that the mutation metrics are same for both cases + assertMetricsAreSame(mutationMetricsForAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip); + } + + private void upsertRows(String upsert, int numRows, Connection conn) throws SQLException { + PreparedStatement stmt = conn.prepareStatement(upsert); + for (int i = 1; i <= numRows; i++) { + stmt.setString(1, "key" + i); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + } + + @Test + public void testMetricsForDeleteWithAutoCommit() throws Exception { + String tableName = "VERIFYDELETEAUTOCOMMIT"; + long tableSaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + tableSaltBuckets; + try (Connection ddlConn = DriverManager.getConnection(getUrl())) { + ddlConn.createStatement().execute(ddl); + } + + String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + int numRows = 10; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + upsertRows(upsert, numRows, conn); + conn.commit(); + } + + String delete = "DELETE FROM " + tableName; + // Delete rows now with auto-commit off + Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOff = null; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + conn.createStatement().executeUpdate(delete); + deleteMetricsWithAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn); + } + + // Upsert the rows back + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + upsertRows(upsert, numRows, conn); + conn.commit(); + } + + // Now delete rows with auto-commit on + Map<String, Map<String, Long>> deleteMetricsWithAutoCommitOn = null; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(true); + conn.createStatement().executeUpdate(delete); + deleteMetricsWithAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn); + } + + // Verify that the mutation metrics are same for both cases. + assertMetricsAreSame(deleteMetricsWithAutoCommitOff, deleteMetricsWithAutoCommitOn, mutationMetricsToSkip); + } + + @Test + public void testMetricsForUpsertSelectWithAutoCommit() throws Exception { + String tableName1 = "UPSERTFROMAUTOCOMMIT"; + long table1SaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + table1SaltBuckets; + Connection ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + int numRows = 10; + insertRowsInTable(tableName1, numRows); + + String tableName2 = "UPSERTTOAUTCOMMIT"; + ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10"; + ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + + String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1; + + Map<String, Map<String, Long>> mutationMetricsAutoCommitOff = null; + Map<String, Map<String, Long>> readMetricsAutoCommitOff = null; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + conn.createStatement().executeUpdate(upsertSelect); + conn.commit(); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + mutationMetricsAutoCommitOff = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + readMetricsAutoCommitOff = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + } + + Map<String, Map<String, Long>> mutationMetricsAutoCommitOn = null; + Map<String, Map<String, Long>> readMetricsAutoCommitOn = null; + + int autoCommitBatchSize = numRows + 1; // batchsize = 11 is less than numRows and is not a divisor of batchsize + Properties props = new Properties(); + props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + conn.createStatement().executeUpdate(upsertSelect); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + } + assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip); + assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip); + + autoCommitBatchSize = numRows - 1; // batchsize = 9 is less than numRows and is not a divisor of batchsize + props = new Properties(); + props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + conn.createStatement().executeUpdate(upsertSelect); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + } + assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip); + assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip); + + autoCommitBatchSize = numRows; + props = new Properties(); + props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + conn.createStatement().executeUpdate(upsertSelect); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + } + assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip); + assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOn, readMetricsToSkip); + + autoCommitBatchSize = 2; // multiple batches of equal size + props = new Properties(); + props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(autoCommitBatchSize)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + conn.createStatement().executeUpdate(upsertSelect); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + mutationMetricsAutoCommitOn = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + readMetricsAutoCommitOn = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + } + assertMetricsAreSame(mutationMetricsAutoCommitOff, mutationMetricsAutoCommitOn, mutationMetricsToSkip); + assertMetricsAreSame(readMetricsAutoCommitOff, readMetricsAutoCommitOff, readMetricsToSkip); + } + + @Test + public void testMutationMetricsWhenUpsertingToMultipleTables() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String table1 = "TABLE1"; + createTableAndInsertValues(true, 10, conn, table1); + String table2 = "TABLE2"; + createTableAndInsertValues(true, 10, conn, table2); + String table3 = "TABLE3"; + createTableAndInsertValues(true, 10, conn, table3); + Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn); + assertTrue("Mutation metrics not present for " + table1, mutationMetrics.get(table1) != null); + assertTrue("Mutation metrics not present for " + table2, mutationMetrics.get(table2) != null); + assertTrue("Mutation metrics not present for " + table3, mutationMetrics.get(table3) != null); + assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table2), mutationMetricsToSkip); + assertMetricsHaveSameValues(mutationMetrics.get(table1), mutationMetrics.get(table3), mutationMetricsToSkip); + } + } + + @Test + public void testClosingConnectionClearsMetrics() throws Exception { + Connection conn = null; + try { + conn = DriverManager.getConnection(getUrl()); + createTableAndInsertValues(true, 10, conn, "clearmetrics"); + assertTrue("Mutation metrics not present", PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() > 0); + } finally { + if (conn != null) { + conn.close(); + assertTrue("Closing connection didn't clear metrics", + PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn).size() == 0); + } + } + } + + @Test + public void testMetricsForUpsertingIntoImmutableTableWithIndices() throws Exception { + String dataTable = "IMMTABLEWITHINDICES"; + String tableDdl = "CREATE TABLE " + + dataTable + + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER, V3 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(K1, K2)) IMMUTABLE_ROWS = true"; + String index1 = "I1"; + String index1Ddl = "CREATE INDEX " + index1 + " ON " + dataTable + " (V1) include (V2)"; + String index2 = "I2"; + String index2Ddl = "CREATE INDEX " + index2 + " ON " + dataTable + " (V2) include (V3)"; + String index3 = "I3"; + String index3Ddl = "CREATE INDEX " + index3 + " ON " + dataTable + " (V3) include (V1)"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(tableDdl); + conn.createStatement().execute(index1Ddl); + conn.createStatement().execute(index2Ddl); + conn.createStatement().execute(index3Ddl); + } + String upsert = "UPSERT INTO " + dataTable + " VALUES (?, ?, ?, ?, ?)"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + /* + * Upsert data into table. Because the table is immutable, mutations for updating the indices on it are + * handled by the client itself. So mutation metrics should include mutations for the indices as well as the + * data table. + */ + PreparedStatement stmt = conn.prepareStatement(upsert); + for (int i = 1; i < 10; i++) { + stmt.setString(1, "key1" + i); + stmt.setString(2, "key2" + i); + stmt.setInt(3, i); + stmt.setInt(4, i); + stmt.setInt(5, i); + stmt.executeUpdate(); + } + conn.commit(); + Map<String, Map<String, Long>> metrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(conn); + assertTrue(metrics.get(dataTable).size() > 0); + assertTrue(metrics.get(index1).size() > 0); + assertTrue(metrics.get(index2).size() > 0); + assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index2), mutationMetricsToSkip); + assertTrue(metrics.get(index3).size() > 0); + assertMetricsHaveSameValues(metrics.get(index1), metrics.get(index3), mutationMetricsToSkip); + } + } + + @Test + public void testMetricsForUpsertSelectSameTable() throws Exception { + String tableName = "UPSERTSAME"; + long table1SaltBuckets = 6; + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " + + table1SaltBuckets; + Connection ddlConn = DriverManager.getConnection(getUrl()); + ddlConn.createStatement().execute(ddl); + ddlConn.close(); + int numRows = 10; + insertRowsInTable(tableName, numRows); + + Connection conn = DriverManager.getConnection(getUrl()); + conn.setAutoCommit(false); + String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName; + conn.createStatement().executeUpdate(upsertSelect); + conn.commit(); + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + + Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + // Because auto-commit is off, upsert select into the same table will run on the client. + // So we should have client side read and write metrics available. + assertMutationMetrics(tableName, numRows, mutationMetrics); + Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics); + PhoenixRuntime.resetMetrics(pConn); + // With autocommit on, still, this upsert select runs on the client side. + conn.setAutoCommit(true); + conn.createStatement().executeUpdate(upsertSelect); + Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); + Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); + assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip); + assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip); + } + + private void createTableAndInsertValues(boolean commit, int numRows, Connection conn, String tableName) + throws SQLException { + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; + conn.createStatement().execute(ddl); + // executing 10 upserts/mutations. + String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + for (int i = 1; i <= numRows; i++) { + stmt.setString(1, "key" + i); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + if (commit) { + conn.commit(); + } + } + + private void assertMetricsAreSame(Map<String, Map<String, Long>> metric1, Map<String, Map<String, Long>> metric2, + List<String> metricsToSkip) { + assertTrue("The two metrics have different or unequal number of table names ", + metric1.keySet().equals(metric2.keySet())); + for (Entry<String, Map<String, Long>> entry : metric1.entrySet()) { + Map<String, Long> metricNameValueMap1 = entry.getValue(); + Map<String, Long> metricNameValueMap2 = metric2.get(entry.getKey()); + assertMetricsHaveSameValues(metricNameValueMap1, metricNameValueMap2, metricsToSkip); + } + } + + private void assertMetricsHaveSameValues(Map<String, Long> metricNameValueMap1, + Map<String, Long> metricNameValueMap2, List<String> metricsToSkip) { + assertTrue("The two metrics have different or unequal number of metric names ", metricNameValueMap1.keySet() + .equals(metricNameValueMap2.keySet())); + for (Entry<String, Long> entry : metricNameValueMap1.entrySet()) { + String metricName = entry.getKey(); + if (!metricsToSkip.contains(metricName)) { + assertEquals("Unequal values for metric " + metricName, entry.getValue(), + metricNameValueMap2.get(metricName)); + } + } + } + + private void changeInternalStateForTesting(PhoenixResultSet rs) { + // get and set the internal state for testing purposes. + ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(true); + StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context"); + Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue); + Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue); + } + + private void assertReadMetricValuesForSelectSql(ArrayList<Long> numRows, ArrayList<Long> numExpectedTasks, + PhoenixResultSet resultSetBeingTested, Set<String> expectedTableNames) throws SQLException { + Map<String, Map<String, Long>> metrics = PhoenixRuntime.getRequestReadMetrics(resultSetBeingTested); + int counter = 0; + for (Entry<String, Map<String, Long>> entry : metrics.entrySet()) { + String tableName = entry.getKey(); + expectedTableNames.remove(tableName); + Map<String, Long> metricValues = entry.getValue(); + boolean scanMetricsPresent = false; + boolean taskCounterMetricsPresent = false; + boolean taskExecutionTimeMetricsPresent = false; + boolean memoryMetricsPresent = false; + for (Entry<String, Long> pair : metricValues.entrySet()) { + String metricName = pair.getKey(); + long metricValue = pair.getValue(); + long n = numRows.get(counter); + long numTask = numExpectedTasks.get(counter); + if (metricName.equals(SCAN_BYTES.name())) { + // we are using a SCAN_BYTES_DELTA of 1. So number of scan bytes read should be number of rows read + assertEquals(n, metricValue); + scanMetricsPresent = true; + } else if (metricName.equals(TASK_EXECUTED_COUNTER.name())) { + assertEquals(numTask, metricValue); + taskCounterMetricsPresent = true; + } else if (metricName.equals(TASK_EXECUTION_TIME.name())) { + assertEquals(numTask * TASK_EXECUTION_TIME_DELTA, metricValue); + taskExecutionTimeMetricsPresent = true; + } else if (metricName.equals(MEMORY_CHUNK_BYTES.name())) { + assertEquals(numTask * MEMORY_CHUNK_BYTES_DELTA, metricValue); + memoryMetricsPresent = true; + } + } + counter++; + assertTrue(scanMetricsPresent); + assertTrue(taskCounterMetricsPresent); + assertTrue(taskExecutionTimeMetricsPresent); + assertTrue(memoryMetricsPresent); + } + PhoenixRuntime.resetMetrics(resultSetBeingTested); + assertTrue("Metrics not found tables " + Joiner.on(",").join(expectedTableNames), + expectedTableNames.size() == 0); + } + + private Connection insertRowsInTable(String tableName, long numRows) throws SQLException { + String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(dml); + for (int i = 1; i <= numRows; i++) { + stmt.setString(1, "key" + i); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + conn.commit(); + return conn; + } + + // number of records read should be number of bytes at the end + public static final long SCAN_BYTES_DELTA = 1; + + // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA + public static final long TASK_EXECUTION_TIME_DELTA = 10; + + // total task execution time should be numTasks * TASK_EXECUTION_TIME_DELTA + public static final long MEMORY_CHUNK_BYTES_DELTA = 100; + + private class TestReadMetricsQueue extends ReadMetricQueue { + + public TestReadMetricsQueue(boolean isRequestMetricsEnabled) { + super(isRequestMetricsEnabled); + } + + @Override + public CombinableMetric getMetric(MetricType type) { + switch (type) { + case SCAN_BYTES: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + super.change(SCAN_BYTES_DELTA); + } + }; + case TASK_EXECUTION_TIME: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + super.change(TASK_EXECUTION_TIME_DELTA); + } + }; + case MEMORY_CHUNK_BYTES: + return new CombinableMetricImpl(type) { + + @Override + public void change(long delta) { + super.change(MEMORY_CHUNK_BYTES_DELTA); + } + }; + } + return super.getMetric(type); + } + } + + private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets, + Map<String, Map<String, Long>> readMetrics) { + assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0); + int numTables = 0; + for (Entry<String, Map<String, Long>> entry : readMetrics.entrySet()) { + String t = entry.getKey(); + assertEquals("Table name didn't match for read metrics", tableName, t); + numTables++; + Map<String, Long> p = entry.getValue(); + assertTrue("No read metrics present when there should have been", p.size() > 0); + for (Entry<String, Long> metric : p.entrySet()) { + String metricName = metric.getKey(); + long metricValue = metric.getValue(); + if (metricName.equals(TASK_EXECUTED_COUNTER.name())) { + assertEquals(tableSaltBuckets, metricValue); + } else if (metricName.equals(SCAN_BYTES.name())) { + assertTrue("Scan bytes read should be greater than zero", metricValue > 0); + } + } + } + assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables); + } + + private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<String, Long>> mutationMetrics) { + assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0); + for (Entry<String, Map<String, Long>> entry : mutationMetrics.entrySet()) { + String t = entry.getKey(); + assertEquals("Table name didn't match for mutation metrics", tableName, t); + Map<String, Long> p = entry.getValue(); + assertEquals("There should have been three metrics", 3, p.size()); + for (Entry<String, Long> metric : p.entrySet()) { + String metricName = metric.getKey(); + long metricValue = metric.getValue(); + if (metricName.equals(MetricType.MUTATION_BATCH_SIZE.name())) { + assertEquals("Mutation batch sizes didn't match!", numRows, metricValue); + } else if (metricName.equals(MetricType.MUTATION_COMMIT_TIME.name())) { + assertTrue("Mutation commit time should be greater than zero", metricValue > 0); + } else if (metricName.equals(MetricType.MUTATION_BYTES.name())) { + assertTrue("Mutation bytes size should be greater than zero", metricValue > 0); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 9718709..9ad9ef5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.cache; +import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE; import static org.apache.phoenix.util.LogUtil.addCustomAnnotations; import java.io.Closeable; @@ -57,6 +58,7 @@ import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -226,6 +228,11 @@ public class ServerCacheClient { public Object getJobId() { return ServerCacheClient.this; } + + @Override + public TaskExecutionMetricsHolder getTaskExecutionMetric() { + return NO_OP_INSTANCE; + } })); } else { if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", connection));} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 575f0f3..a28f614 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -94,8 +94,9 @@ public class DeleteCompiler { this.statement = statement; } - private static MutationState deleteRows(PhoenixStatement statement, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { + private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { PTable table = targetTableRef.getTable(); + PhoenixStatement statement = childContext.getStatement(); PhoenixConnection connection = statement.getConnection(); PName tenantId = connection.getTenantId(); byte[] tenantIdBytes = null; @@ -114,19 +115,18 @@ public class DeleteCompiler { if (indexTableRef != null) { indexMutations = Maps.newHashMapWithExpectedSize(batchSize); } - try { - List<PColumn> pkColumns = table.getPKColumns(); - boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; - boolean isSharedViewIndex = table.getViewIndexId() != null; - int offset = (table.getBucketNum() == null ? 0 : 1); - byte[][] values = new byte[pkColumns.size()][]; - if (isMultiTenant) { - values[offset++] = tenantIdBytes; - } - if (isSharedViewIndex) { - values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); - } - PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, statement); + List<PColumn> pkColumns = table.getPKColumns(); + boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; + boolean isSharedViewIndex = table.getViewIndexId() != null; + int offset = (table.getBucketNum() == null ? 0 : 1); + byte[][] values = new byte[pkColumns.size()][]; + if (isMultiTenant) { + values[offset++] = tenantIdBytes; + } + if (isSharedViewIndex) { + values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); + } + try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { int rowCount = 0; while (rs.next()) { ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map @@ -183,8 +183,6 @@ public class DeleteCompiler { state.join(indexState); } return state; - } finally { - iterator.close(); } } @@ -199,9 +197,16 @@ public class DeleteCompiler { } @Override - protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException { + protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException { PhoenixStatement statement = new PhoenixStatement(connection); - return deleteRows(statement, targetTableRef, indexTableRef, iterator, projector, sourceTableRef); + /* + * We don't want to collect any read metrics within the child context. This is because any read metrics that + * need to be captured are already getting collected in the parent statement context enclosed in the result + * iterator being used for reading rows out. + */ + StatementContext ctx = new StatementContext(statement, false); + MutationState state = deleteRows(ctx, targetTableRef, indexTableRef, iterator, projector, sourceTableRef); + return state; } public void setTargetTableRef(TableRef tableRef) { @@ -559,9 +564,14 @@ public class DeleteCompiler { } // Return total number of rows that have been delete. In the case of auto commit being off // the mutations will all be in the mutation state of the current connection. - return new MutationState(maxSize, connection, totalRowCount); + MutationState state = new MutationState(maxSize, connection, totalRowCount); + + // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. + state.setReadMetricQueue(plan.getContext().getReadMetricsQueue()); + + return state; } else { - return deleteRows(statement, tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef()); + return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index bcac17d..630760c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -35,9 +35,9 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.KeyValueUtil; /** @@ -53,21 +53,34 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato /** * Method that does the actual mutation work */ - abstract protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException; + abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException; @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator iterator, Scan scan) throws SQLException { - final PhoenixConnection connection = new PhoenixConnection(this.connection); - MutationState state = mutate(context, iterator, connection); + public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException { + final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection); + + MutationState state = mutate(parentContext, iterator, clonedConnection); + long totalRowCount = state.getUpdateCount(); - if (connection.getAutoCommit()) { - connection.getMutationState().join(state); - connection.commit(); - ConnectionQueryServices services = connection.getQueryServices(); - int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - state = new MutationState(maxSize, connection, totalRowCount); + if (clonedConnection.getAutoCommit()) { + clonedConnection.getMutationState().join(state); + clonedConnection.commit(); + ConnectionQueryServices services = clonedConnection.getQueryServices(); + int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + /* + * Everything that was mutated as part of the clonedConnection has been committed. However, we want to + * report the mutation work done using this clonedConnection as part of the overall mutation work of the + * parent connection. So we need to set those metrics in the empty mutation state so that they could be + * combined with the parent connection's mutation metrics (as part of combining mutation state) in the + * close() method of the iterator being returned. Don't combine the read metrics in parent context yet + * though because they are possibly being concurrently modified by other threads at this stage. Instead we + * will get hold of the read metrics when all the mutating iterators are done. + */ + state = MutationState.emptyMutationState(maxSize, clonedConnection); + state.getMutationMetricQueue().combineMetricQueues(clonedConnection.getMutationState().getMutationMetricQueue()); } final MutationState finalState = state; + byte[] value = PLong.INSTANCE.toBytes(totalRowCount); KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); final Tuple tuple = new SingleKeyValueTuple(keyValue); @@ -90,13 +103,17 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato @Override public void close() throws SQLException { try { - // Join the child mutation states in close, since this is called in a single threaded manner - // after the parallel results have been processed. - if (!connection.getAutoCommit()) { - MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState); - } + /* + * Join the child mutation states in close, since this is called in a single threaded manner + * after the parallel results have been processed. + * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation + * state (with no mutations). However, it still has the metrics for mutation work done by the + * mutating-iterator. Joining the mutation state makes sure those metrics are passed over + * to the parent connection. + */ + MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState); } finally { - connection.close(); + clonedConnection.close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java index d726488..52bb7f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.monitoring.OverAllQueryMetrics; +import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -41,6 +43,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.NumberUtil; +import org.apache.phoenix.util.ReadOnlyProps; import com.google.common.collect.Maps; @@ -80,10 +83,19 @@ public class StatementContext { private TimeRange scanTimeRange = null; private Map<SelectStatement, Object> subqueryResults; - + private final ReadMetricQueue readMetricsQueue; + private final OverAllQueryMetrics overAllQueryMetrics; + public StatementContext(PhoenixStatement statement) { this(statement, new Scan()); } + + /** + * Constructor that lets you override whether or not to collect request level metrics. + */ + public StatementContext(PhoenixStatement statement, boolean collectRequestLevelMetrics) { + this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement), collectRequestLevelMetrics); + } public StatementContext(PhoenixStatement statement, Scan scan) { this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan(), new SequenceManager(statement)); @@ -94,6 +106,10 @@ public class StatementContext { } public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager) { + this(statement, resolver, scan, seqManager, statement.getConnection().isRequestLevelMetricsEnabled()); + } + + public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Scan scan, SequenceManager seqManager, boolean isRequestMetricsEnabled) { this.statement = statement; this.resolver = resolver; this.scan = scan; @@ -102,20 +118,24 @@ public class StatementContext { this.aggregates = new AggregationManager(); this.expressions = new ExpressionManager(); PhoenixConnection connection = statement.getConnection(); - this.dateFormat = connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT); + ReadOnlyProps props = connection.getQueryServices().getProps(); + this.dateFormat = props.get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT); this.dateFormatter = DateUtil.getDateFormatter(dateFormat); - this.timeFormat = connection.getQueryServices().getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT); + this.timeFormat = props.get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT); this.timeFormatter = DateUtil.getTimeFormatter(timeFormat); - this.timestampFormat = connection.getQueryServices().getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT); + this.timestampFormat = props.get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT); this.timestampFormatter = DateUtil.getTimestampFormatter(timestampFormat); - this.dateFormatTimeZone = TimeZone.getTimeZone( - connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, DateUtil.DEFAULT_TIME_ZONE_ID)); - this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT); + this.dateFormatTimeZone = TimeZone.getTimeZone(props.get(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB, + DateUtil.DEFAULT_TIME_ZONE_ID)); + this.numberFormat = props.get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT); this.tempPtr = new ImmutableBytesWritable(); this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null; - this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>(); - this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer>emptyMap() : Maps.<PColumn, Integer>newLinkedHashMap(); - this.subqueryResults = Maps.<SelectStatement, Object>newHashMap(); + this.whereConditionColumns = new ArrayList<Pair<byte[], byte[]>>(); + this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps + .<PColumn, Integer> newLinkedHashMap(); + this.subqueryResults = Maps.<SelectStatement, Object> newHashMap(); + this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled); + this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled); } /** @@ -285,4 +305,13 @@ public class StatementContext { public void setSubqueryResult(SelectStatement select, Object result) { subqueryResults.put(select, result); } + + public ReadMetricQueue getReadMetricsQueue() { + return readMetricsQueue; + } + + public OverAllQueryMetrics getOverallQueryMetrics() { + return overAllQueryMetrics; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 2b35d4f..7b39a28 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -118,43 +118,40 @@ public class UpsertCompiler { mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter())); } - private static MutationState upsertSelect(PhoenixStatement statement, - TableRef tableRef, RowProjector projector, ResultIterator iterator, int[] columnIndexes, - int[] pkSlotIndexes) throws SQLException { - try { - PhoenixConnection connection = statement.getConnection(); - ConnectionQueryServices services = connection.getQueryServices(); - int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - boolean isAutoCommit = connection.getAutoCommit(); - byte[][] values = new byte[columnIndexes.length][]; - int rowCount = 0; - Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); - PTable table = tableRef.getTable(); - ResultSet rs = new PhoenixResultSet(iterator, projector, statement); + private static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, + ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes) throws SQLException { + PhoenixStatement statement = childContext.getStatement(); + PhoenixConnection connection = statement.getConnection(); + ConnectionQueryServices services = connection.getQueryServices(); + int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + boolean isAutoCommit = connection.getAutoCommit(); + byte[][] values = new byte[columnIndexes.length][]; + int rowCount = 0; + Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); + PTable table = tableRef.getTable(); + try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); while (rs.next()) { for (int i = 0; i < values.length; i++) { PColumn column = table.getColumns().get(columnIndexes[i]); - byte[] bytes = rs.getBytes(i+1); + byte[] bytes = rs.getBytes(i + 1); ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes); - Object value = rs.getObject(i+1); - int rsPrecision = rs.getMetaData().getPrecision(i+1); + Object value = rs.getObject(i + 1); + int rsPrecision = rs.getMetaData().getPrecision(i + 1); Integer precision = rsPrecision == 0 ? null : rsPrecision; - int rsScale = rs.getMetaData().getScale(i+1); + int rsScale = rs.getMetaData().getScale(i + 1); Integer scale = rsScale == 0 ? null : rsScale; // We are guaranteed that the two column will have compatible types, // as we checked that before. - if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), - precision, scale, - column.getMaxLength(),column.getScale())) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY) - .setColumnName(column.getName().getString()) - .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build().buildException(); - } - column.getDataType().coerceBytes(ptr, value, column.getDataType(), - precision, scale, SortOrder.getDefault(), - column.getMaxLength(), column.getScale(), column.getSortOrder()); + if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale, + column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString()) + .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build() + .buildException(); } + column.getDataType().coerceBytes(ptr, value, column.getDataType(), precision, scale, + SortOrder.getDefault(), column.getMaxLength(), column.getScale(), column.getSortOrder()); values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); } setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement); @@ -169,8 +166,6 @@ public class UpsertCompiler { } // If auto commit is true, this last batch will be committed upon return return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); - } finally { - iterator.close(); } } @@ -186,14 +181,21 @@ public class UpsertCompiler { } @Override - protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException { - if (context.getSequenceManager().getSequenceCount() > 0) { + protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException { + if (parentContext.getSequenceManager().getSequenceCount() > 0) { throw new IllegalStateException("Cannot pipeline upsert when sequence is referenced"); } PhoenixStatement statement = new PhoenixStatement(connection); + /* + * We don't want to collect any read metrics within the child context. This is because any read metrics that + * need to be captured are already getting collected in the parent statement context enclosed in the result + * iterator being used for reading rows out. + */ + StatementContext childContext = new StatementContext(statement, false); // Clone the row projector as it's not thread safe and would be used simultaneously by // multiple threads otherwise. - return upsertSelect(statement, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes); + MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes); + return state; } public void setRowProjector(RowProjector projector) { @@ -669,7 +671,7 @@ public class UpsertCompiler { public MutationState execute() throws SQLException { ResultIterator iterator = queryPlan.iterator(); if (parallelIteratorFactory == null) { - return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes); + return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes); } try { parallelIteratorFactory.setRowProjector(projector); @@ -677,13 +679,21 @@ public class UpsertCompiler { parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes); Tuple tuple; long totalRowCount = 0; + StatementContext context = queryPlan.getContext(); while ((tuple=iterator.next()) != null) {// Runs query Cell kv = tuple.getValue(0); totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); } // Return total number of rows that have been updated. In the case of auto commit being off // the mutations will all be in the mutation state of the current connection. - return new MutationState(maxSize, statement.getConnection(), totalRowCount); + MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount); + /* + * All the metrics collected for measuring the reads done by the parallel mutating iterators + * is included in the ReadMetricHolder of the statement context. Include these metrics in the + * returned mutation state so they can be published on commit. + */ + mutationState.setReadMetricQueue(context.getReadMetricsQueue()); + return mutationState; } finally { iterator.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index ba137f8..00e843d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -102,7 +102,7 @@ public class AggregatePlan extends BaseQueryPlan { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException { Expression expression = RowKeyExpression.INSTANCE; OrderByExpression orderByExpression = new OrderByExpression(expression, false, true); int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); @@ -119,9 +119,9 @@ public class AggregatePlan extends BaseQueryPlan { this.outerFactory = outerFactory; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { - PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan); - return outerFactory.newIterator(context, iterator, scan); + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException { + PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName); + return outerFactory.newIterator(context, iterator, scan, tableName); } }