Repository: phoenix Updated Branches: refs/heads/master 10efdb1f2 -> 6b0461002
Phoenix-1264 : Add StatisticsCollector to existing tables on first connection to cluster Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6b046100 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6b046100 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6b046100 Branch: refs/heads/master Commit: 6b04610022415fcc27ea69fe001cbd464badf355 Parents: 10efdb1 Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Authored: Fri Sep 26 11:21:40 2014 +0530 Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com> Committed: Fri Sep 26 11:21:40 2014 +0530 ---------------------------------------------------------------------- ...efaultParallelIteratorsRegionSplitterIT.java | 15 ++ .../phoenix/end2end/GuidePostsLifeCycleIT.java | 22 +- .../org/apache/phoenix/end2end/KeyOnlyIT.java | 15 ++ .../phoenix/end2end/MultiCfQueryExecIT.java | 14 ++ .../phoenix/end2end/StatsCollectorIT.java | 44 ++-- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../UngroupedAggregateRegionObserver.java | 72 +++++- .../DefaultParallelIteratorRegionSplitter.java | 30 +-- .../phoenix/query/ConnectionQueryServices.java | 3 - .../query/ConnectionQueryServicesImpl.java | 51 +--- .../query/ConnectionlessQueryServicesImpl.java | 6 - .../query/DelegateConnectionQueryServices.java | 5 - .../apache/phoenix/schema/MetaDataClient.java | 36 ++- .../schema/stat/StatisticsCollector.java | 249 +++++-------------- .../phoenix/schema/stat/StatisticsScanner.java | 7 +- .../phoenix/schema/stat/StatisticsTable.java | 49 ++-- .../phoenix/schema/stat/StatisticsTracker.java | 62 ----- .../java/org/apache/phoenix/util/ScanUtil.java | 4 + .../phoenix/query/QueryServicesTestImpl.java | 2 +- .../src/main/StatisticsCollect.proto | 20 -- 20 files changed, 272 insertions(+), 435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java index dd1dc8b..a6ec835 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java @@ -27,6 +27,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HConstants; @@ -40,13 +41,18 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Maps; + /** * Tests for {@link DefaultParallelIteratorRegionSplitter}. @@ -58,6 +64,14 @@ import org.junit.experimental.categories.Category; @Category(ClientManagedTimeTest.class) public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIteratorsRegionSplitterIT { + @BeforeClass + @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan) throws SQLException { TableRef tableRef = getTableRef(conn, ts); @@ -93,6 +107,7 @@ public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIterat Scan scan = new Scan(); // number of regions > target query concurrency + conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); scan.setStartRow(K1); scan.setStopRow(K12); List<KeyRange> keyRanges = getSplits(conn, ts, scan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java index 7645040..3cef492 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java @@ -28,6 +28,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HRegionLocation; @@ -40,16 +41,32 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Maps; + @Category(HBaseManagedTimeTest.class) public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT { - + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20)); + props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + protected static final byte[] KMIN = new byte[] {'!'}; protected static final byte[] KMIN2 = new byte[] {'.'}; protected static final byte[] K1 = new byte[] {'a'}; @@ -106,16 +123,19 @@ public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT { upsert(new byte[][] { KMIN, K4, K11 }); stmt = conn.prepareStatement("ANALYZE STABLE"); stmt.execute(); + conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); keyRanges = getSplits(conn, scan); assertEquals(7, keyRanges.size()); upsert(new byte[][] { KMIN2, K5, K12 }); stmt = conn.prepareStatement("ANALYZE STABLE"); stmt.execute(); + conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); keyRanges = getSplits(conn, scan); assertEquals(10, keyRanges.size()); upsert(new byte[][] { K1, K6, KP }); stmt = conn.prepareStatement("ANALYZE STABLE"); stmt.execute(); + conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); keyRanges = getSplits(conn, scan); assertEquals(13, keyRanges.size()); conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java index 4b0d07f..f713fff 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java @@ -32,6 +32,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HRegionLocation; @@ -44,15 +45,29 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Maps; + @Category(ClientManagedTimeTest.class) public class KeyOnlyIT extends BaseClientManagedTimeIT { + + @BeforeClass + @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } @Test public void testKeyOnly() throws Exception { long ts = nextTimestamp(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java index ebf03d0..f01d985 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java @@ -32,6 +32,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HRegionLocation; @@ -44,17 +45,30 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Maps; + @Category(ClientManagedTimeTest.class) public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { private static final String MULTI_CF = "MULTI_CF"; + @BeforeClass + @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } protected static void initTableValues(long ts) throws Exception { ensureTableCreated(getUrl(),MULTI_CF,null, ts-2); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java index a38abea..e20c11f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java @@ -1,10 +1,5 @@ package org.apache.phoenix.end2end; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; -import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; -import static org.apache.phoenix.util.TestUtil.LOCALHOST; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertTrue; @@ -18,9 +13,6 @@ import java.sql.SQLException; import java.util.Map; import java.util.Properties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -32,27 +24,19 @@ import com.google.common.collect.Maps; @Category(HBaseManagedTimeTest.class) public class StatsCollectorIT extends BaseHBaseManagedTimeIT { - private static String url; - private static HBaseTestingUtility util; - private static int frequency = 4000; - + //private static String url; + private static int frequency = 5000; + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) public static void doSetup() throws Exception { - Configuration conf = HBaseConfiguration.create(); - setUpConfigForMiniCluster(conf); - conf.setInt("hbase.client.retries.number", 2); - conf.setInt("hbase.client.pause", 5000); - conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0); - util = new HBaseTestingUtility(conf); - util.startMiniCluster(); - String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB); - url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort - + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; - int histogramDepth = 60; - Map<String, String> props = Maps.newHashMapWithExpectedSize(3); - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Integer.toString(histogramDepth)); + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(frequency)); - driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20)); + props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @Test @@ -62,7 +46,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT { ResultSet rs; Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10)); - conn = DriverManager.getConnection(url, props); + conn = DriverManager.getConnection(getUrl(), props); conn.createStatement().execute( "CREATE TABLE t ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n"); @@ -99,7 +83,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT { long ts = nextTimestamp(); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10)); - conn = DriverManager.getConnection(url, props); + conn = DriverManager.getConnection(getUrl(), props); conn.createStatement().execute( "CREATE TABLE x ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n" + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n"); @@ -148,7 +132,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT { Connection conn; PreparedStatement stmt; // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30)); - conn = DriverManager.getConnection(url, props); + conn = DriverManager.getConnection(getUrl(), props); stmt = upsertStmt(conn, tableName); stmt.setString(1, "a"); String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" }; @@ -219,7 +203,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT { } private void flush(String tableName) throws IOException, InterruptedException { - util.getHBaseAdmin().flush(tableName.toUpperCase()); + //utility.getHBaseAdmin().flush(tableName.toUpperCase()); } private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index b2e2806..1129eef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -60,6 +60,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String VIEW_CONSTANTS = "_ViewConstants"; public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; + public static final String ANALYZE_TABLE = "_ANALYZETABLE"; /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 95b095e..d39f868 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -34,6 +34,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HRegionInfo; @@ -47,7 +49,11 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.coprocessor.generated.PTableProtos; @@ -63,6 +69,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.query.QueryConstants; @@ -74,6 +81,8 @@ import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.stat.StatisticsCollector; +import org.apache.phoenix.schema.stat.StatisticsTable; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; @@ -94,7 +103,7 @@ import com.google.common.collect.Sets; * * @since 0.1 */ -public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver { +public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // TODO: move all constants into a single class public static final String UNGROUPED_AGG = "UngroupedAgg"; public static final String DELETE_AGG = "DeleteAgg"; @@ -105,6 +114,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public static final String EMPTY_CF = "EmptyCF"; private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; + private static final Log LOG = LogFactory.getLog(UngroupedAggregateRegionObserver.class); + private StatisticsTable statsTable = null; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -112,6 +123,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Can't use ClientKeyValueBuilder on server-side because the memstore expects to // be able to get a single backing buffer for a KeyValue. this.kvBuilder = GenericKeyValueBuilder.INSTANCE; + String name = ((RegionCoprocessorEnvironment)e).getRegion().getTableDesc().getTableName().getNameAsString(); + this.statsTable = StatisticsTable.getStatisticsTableForCoprocessor(e.getConfiguration(), name); } private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException { @@ -128,16 +141,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public static void serializeIntoScan(Scan scan) { scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE); } - + @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { int offset = 0; + boolean isAnalyze = false; + HRegion region = c.getEnvironment().getRegion(); + TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); + StatisticsCollector stats = null; + if(scan.getAttribute(BaseScannerRegionObserver.ANALYZE_TABLE) != null && statsTable != null) { + stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration()); + isAnalyze = true; + } if (ScanUtil.isLocalIndex(scan)) { /* * For local indexes, we need to set an offset on row key expressions to skip * the region start key. */ - HRegion region = c.getEnvironment().getRegion(); offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length; ScanUtil.setRowKeyOffset(scan, offset); } @@ -199,7 +219,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver int batchSize = 0; long ts = scan.getTimeRange().getMax(); - HRegion region = c.getEnvironment().getRegion(); List<Mutation> mutations = Collections.emptyList(); boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { @@ -214,7 +233,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver boolean hasAny = false; MultiKeyValueTuple result = new MultiKeyValueTuple(); if (logger.isInfoEnabled()) { - logger.info("Starting ungrouped coprocessor scan " + scan); + logger.info("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo()); } long rowCount = 0; region.startRegionOperation(); @@ -226,6 +245,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // since this is an indication of whether or not there are more values after the // ones returned hasMore = innerScanner.nextRaw(results); + if(isAnalyze && stats != null) { + stats.collectStatistics(results); + } if (!results.isEmpty()) { if (localIndexScan && !isDelete) { @@ -346,6 +368,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } while (hasMore); } finally { try { + if (isAnalyze && stats != null) { + stats.updateStatistic(region); + stats.clear(); + } innerScanner.close(); } finally { region.closeRegionOperation(); @@ -408,6 +434,42 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver }; return scanner; } + + @Override + public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, + Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, + long earliestPutTs, InternalScanner s) throws IOException { + InternalScanner internalScan = s; + TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); + if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME) + && scanType.equals(ScanType.COMPACT_DROP_DELETES)) { + StatisticsCollector stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration()); + internalScan = + stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s); + } + return internalScan; + } + + + @Override + public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) + throws IOException { + HRegion region = e.getEnvironment().getRegion(); + TableName table = region.getRegionInfo().getTable(); + if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + try { + StatisticsCollector stats = new StatisticsCollector(statsTable, e.getEnvironment() + .getConfiguration()); + stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region); + stats.clear(); + } catch (IOException ioe) { + if(LOG.isDebugEnabled()) { + LOG.debug("Error while collecting stats during split ",ioe); + } + } + } + + } private HRegion getIndexRegion(RegionCoprocessorEnvironment environment) throws IOException { HRegion userRegion = environment.getRegion(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java index 227163e..a0ac20c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java @@ -33,6 +33,7 @@ import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,23 +112,24 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe PTable table = this.tableRef.getTable(); byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); List<byte[]> gps = Lists.newArrayList(); - - if (table.getColumnFamilies().isEmpty()) { - // For sure we can get the defaultCF from the table - gps = table.getGuidePosts(); - } else { - try { - if (scan.getFamilyMap().size() > 0) { - if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan + if (!ScanUtil.isAnalyzeTable(scan)) { + if (table.getColumnFamilies().isEmpty()) { + // For sure we can get the defaultCF from the table + gps = table.getGuidePosts(); + } else { + try { + if (scan.getFamilyMap().size() > 0) { + if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan + gps = table.getColumnFamily(defaultCF).getGuidePosts(); + } else { // Otherwise, just use first CF in use by scan + gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts(); + } + } else { gps = table.getColumnFamily(defaultCF).getGuidePosts(); - } else { // Otherwise, just use first CF in use by scan - gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts(); } - } else { - gps = table.getColumnFamily(defaultCF).getGuidePosts(); + } catch (ColumnFamilyNotFoundException cfne) { + // Alter table does this } - } catch (ColumnFamilyNotFoundException cfne) { - // Alter table does this } } List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 0c1f45d..15c8ebe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -94,9 +94,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated void addConnection(PhoenixConnection connection) throws SQLException; void removeConnection(PhoenixConnection connection) throws SQLException; - long updateStatistics(KeyRange keyRange, byte[] tableName) - throws SQLException; - /** * @return the {@link KeyValueBuilder} that is valid for the locally installed version of HBase. */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index dfd56bc..25117ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -88,9 +88,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService; import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -124,7 +121,6 @@ import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; -import org.apache.phoenix.schema.stat.StatisticsCollector; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; @@ -143,7 +139,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.protobuf.HBaseZeroCopyByteString; -import com.google.protobuf.ServiceException; public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { @@ -595,10 +590,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null); } - - if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) { - descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null); - } // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also, // don't install on the metadata table until we fix the TODO there. @@ -1864,47 +1855,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - - @Override - public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException { - HTableInterface ht = null; - try { - ht = this.getTable(tableName); - Batch.Call<StatCollectService, StatCollectResponse> callable = new Batch.Call<StatCollectService, StatCollectResponse>() { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<StatCollectResponse> rpcCallback = new BlockingRpcCallback<StatCollectResponse>(); - - @Override - public StatCollectResponse call(StatCollectService service) throws IOException { - StatCollectRequest.Builder builder = StatCollectRequest.newBuilder(); - builder.setStartRow(HBaseZeroCopyByteString.wrap(keyRange.getLowerRange())); - builder.setStopRow(HBaseZeroCopyByteString.wrap(keyRange.getUpperRange())); - service.collectStat(controller, builder.build(), rpcCallback); - if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } - return rpcCallback.get(); - } - }; - Map<byte[], StatCollectResponse> result = ht.coprocessorService(StatCollectService.class, - keyRange.getLowerRange(), keyRange.getUpperRange(), callable); - StatCollectResponse next = result.values().iterator().next(); - return next.getRowsScanned(); - } catch (ServiceException e) { - throw new SQLException("Unable to update the statistics for the table " + tableName, e); - } catch (TableNotFoundException e) { - throw new SQLException("Unable to update the statistics for the table " + tableName, e); - } catch (Throwable e) { - throw new SQLException("Unable to update the statistics for the table " + tableName, e); - } finally { - if (ht != null) { - try { - ht.close(); - } catch (IOException e) { - throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e); - } - } - } - } - + @Override public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, final long clientTS) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 9fa415c..055bc79 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -188,12 +188,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException { return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null); } - - @Override - public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException { - // Noop - return 0; - } @Override public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS) http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index fa01f09..8bd2c61 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -226,11 +226,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public String getUserName() { return getDelegate().getUserName(); } - - @Override - public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException { - return getDelegate().updateStatistics(keyRange, tableName); - } @Override public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS) http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 1f933d8..82eb836 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -130,7 +130,6 @@ import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.PrimaryKeyConstraint; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.parse.UpdateStatisticsStatement; -import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -141,7 +140,6 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -479,20 +477,6 @@ public class MetaDataClient { PTable table = resolver.getTables().get(0).getTable(); PName physicalName = table.getPhysicalName(); byte[] tenantIdBytes = ByteUtil.EMPTY_BYTE_ARRAY; - KeyRange analyzeRange = KeyRange.EVERYTHING_RANGE; - if (connection.getTenantId() != null && table.isMultiTenant()) { - tenantIdBytes = connection.getTenantId().getBytes(); - // TODO remove this inner if once PHOENIX-1259 is fixed. - if (table.getBucketNum() == null && table.getIndexType() != IndexType.LOCAL) { - List<List<KeyRange>> tenantIdKeyRanges = Collections.singletonList(Collections.singletonList(KeyRange - .getKeyRange(tenantIdBytes))); - byte[] lowerRange = ScanUtil.getMinKey(table.getRowKeySchema(), tenantIdKeyRanges, - ScanUtil.SINGLE_COLUMN_SLOT_SPAN); - byte[] upperRange = ScanUtil.getMaxKey(table.getRowKeySchema(), tenantIdKeyRanges, - ScanUtil.SINGLE_COLUMN_SLOT_SPAN); - analyzeRange = KeyRange.getKeyRange(lowerRange, upperRange); - } - } Long scn = connection.getSCN(); // Always invalidate the cache long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn; @@ -509,12 +493,26 @@ public class MetaDataClient { lastUpdatedTime = rs.getDate(1).getTime() - rs.getDate(2).getTime(); } if (minTimeForStatsUpdate > lastUpdatedTime) { + // Here create the select query. + String countQuery = "SELECT /*+ NO_CACHE */ count(*) FROM " + table.getName().getString(); + PhoenixStatement statement = (PhoenixStatement) connection.createStatement(); + QueryPlan plan = statement.compileQuery(countQuery); + Scan scan = plan.getContext().getScan(); + // Add all CF in the table + scan.getFamilyMap().clear(); + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } + scan.setAttribute(BaseScannerRegionObserver.ANALYZE_TABLE, PDataType.TRUE_BYTES); + Cell kv = plan.iterator().next().getValue(0); + ImmutableBytesWritable tempPtr = plan.getContext().getTempPtr(); + tempPtr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); + // A single Cell will be returned with the count(*) - we decode that here + long rowCount = PDataType.LONG.getCodec().decodeLong(tempPtr, SortOrder.getDefault()); // We need to update the stats table - connection.getQueryServices().updateStatistics(analyzeRange, physicalName.getBytes()); connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(), table.getTableName().getBytes(), clientTS); - updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true); - return new MutationState(1, connection); + return new MutationState(0, connection, rowCount); } else { return new MutationState(0, connection); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java index 7552698..6b45c5e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java @@ -21,27 +21,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.hadoop.hbase.coprocessor.CoprocessorException; -import org.apache.hadoop.hbase.coprocessor.CoprocessorService; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -50,13 +38,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse.Builder; -import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PDataType; @@ -64,91 +46,50 @@ import org.apache.phoenix.schema.PhoenixArray; import org.apache.phoenix.util.TimeKeeper; import com.google.common.collect.Lists; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; +import com.google.common.collect.Maps; /** - * An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an - * RegionObserver that collects the information on compaction also. The user would be allowed to invoke this endpoint and thus populate the - * Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated - * with every PTable and the same can be used to parallelize the queries + * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and + * guideposts */ -public class StatisticsCollector extends BaseRegionObserver implements CoprocessorService, Coprocessor, - StatisticsTracker, StatCollectService.Interface { +public class StatisticsCollector { - public static void addToTable(HTableDescriptor desc) throws IOException { - desc.addCoprocessor(StatisticsCollector.class.getName()); - } - - private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>(); - private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>(); + private Map<String, byte[]> minMap = Maps.newHashMap(); + private Map<String, byte[]> maxMap = Maps.newHashMap(); private long guidepostDepth; private long byteCount = 0; - private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>(); - private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>(); - private RegionCoprocessorEnvironment env; - protected StatisticsTable stats; + private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap(); + private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap(); + protected StatisticsTable statsTable; // Ensures that either analyze or compaction happens at any point of time. - private ReentrantLock lock = new ReentrantLock(); private static final Log LOG = LogFactory.getLog(StatisticsCollector.class); - @Override - public void collectStat(RpcController controller, StatCollectRequest request, RpcCallback<StatCollectResponse> done) { - HRegion region = env.getRegion(); - boolean heldLock = false; - int count = 0; - Builder newBuilder = StatCollectResponse.newBuilder(); + public StatisticsCollector(StatisticsTable statsTable, Configuration conf) throws IOException { + // Get the stats table associated with the current table on which the CP is + // triggered + this.statsTable = statsTable; + guidepostDepth = + conf.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, + QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); + } + + public void updateStatistic(HRegion region) { try { - if (lock.tryLock()) { - heldLock = true; - // Clear all old stats - clear(); - Scan scan = createScan(env.getConfiguration()); - if (request.hasStartRow()) { - scan.setStartRow(request.getStartRow().toByteArray()); - } - if (request.hasStopRow()) { - scan.setStopRow(request.getStopRow().toByteArray()); - } - RegionScanner scanner = null; - try { - scanner = region.getScanner(scan); - count = scanRegion(scanner, count); - } catch (IOException e) { - LOG.error(e); - ResponseConverter.setControllerException(controller, e); - } finally { - if (scanner != null) { - try { - ArrayList<Mutation> mutations = new ArrayList<Mutation>(); - writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime()); - if (LOG.isDebugEnabled()) { - LOG.debug("Committing new stats for the region " + region.getRegionInfo()); - } - commitStats(mutations); - } catch (IOException e) { - LOG.error(e); - ResponseConverter.setControllerException(controller, e); - } finally { - clear(); - } - } - } + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime()); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing new stats for the region " + region.getRegionInfo()); } + commitStats(mutations); + } catch (IOException e) { + LOG.error(e); } finally { - if (heldLock) { - lock.unlock(); - } - newBuilder.setRowsScanned(count); - StatCollectResponse result = newBuilder.build(); - done.run(result); + clear(); } } - + private void writeStatsToStatsTable(final HRegion region, - final RegionScanner scanner, boolean delete, List<Mutation> mutations, long currentTime) throws IOException { - scanner.close(); + boolean delete, List<Mutation> mutations, long currentTime) throws IOException { try { // update the statistics table for (ImmutableBytesPtr fam : familyMap.keySet()) { @@ -157,13 +98,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess if(LOG.isDebugEnabled()) { LOG.debug("Deleting the stats for the region "+region.getRegionInfo()); } - stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this, + statsTable.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); } if(LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region "+region.getRegionInfo()); } - stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, + statsTable.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); } } catch (IOException e) { @@ -173,7 +114,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess } private void commitStats(List<Mutation> mutations) throws IOException { - stats.commitStats(mutations); + statsTable.commitStats(mutations); } private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException { @@ -181,7 +122,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess // update the statistics table for (ImmutableBytesPtr fam : familyMap.keySet()) { String tableName = region.getRegionInfo().getTable().getNameAsString(); - stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, + statsTable.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); } } catch (IOException e) { @@ -196,7 +137,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess while (hasMore) { // Am getting duplicates here. Need to avoid that hasMore = scanner.next(results); - updateStat(results); + collectStatistics(results); count += results.size(); results.clear(); while (!hasMore) { @@ -212,93 +153,42 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess * @param results * next batch of {@link KeyValue}s */ - protected void updateStat(final List<Cell> results) { + public void collectStatistics(final List<Cell> results) { for (Cell c : results) { KeyValue kv = KeyValueUtil.ensureKeyValue(c); updateStatistic(kv); } } - @Override - public Service getService() { - return StatCollectorProtos.StatCollectService.newReflectiveService(this); - } - - @Override - public void start(CoprocessorEnvironment env) throws IOException { - if (env instanceof RegionCoprocessorEnvironment) { - this.env = (RegionCoprocessorEnvironment)env; - } else { - throw new CoprocessorException("Must be loaded on a table region!"); - } - HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc(); - // Get the stats table associated with the current table on which the CP is - // triggered - stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName()); - guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, - QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); - } - - @Override - public void stop(CoprocessorEnvironment env) throws IOException { - if (env instanceof RegionCoprocessorEnvironment) { - TableName table = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTable(); - // Close only if the table is system table - if(table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { - stats.close(); - } - } - } - - @Override - public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) - throws IOException { + public InternalScanner createCompactionScanner(HRegion region, Store store, + List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException { + // See if this is for Major compaction InternalScanner internalScan = s; - TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); - if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { - boolean heldLock = false; - try { - if (lock.tryLock()) { - heldLock = true; - // See if this is for Major compaction - if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { - // this is the first CP accessed, so we need to just create a major - // compaction scanner, just - // like in the compactor - if (s == null) { - Scan scan = new Scan(); - scan.setMaxVersions(store.getFamily().getMaxVersions()); - long smallestReadPoint = store.getSmallestReadPoint(); - internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType, - smallestReadPoint, earliestPutTs); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Compaction scanner created for stats"); - } - InternalScanner scanner = getInternalScanner(c, store, internalScan, - store.getColumnFamilyName()); - if (scanner != null) { - internalScan = scanner; - } - } - } - } finally { - if (heldLock) { - lock.unlock(); - } + if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { + // this is the first CP accessed, so we need to just create a major + // compaction scanner, just + // like in the compactor + if (s == null) { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + long smallestReadPoint = store.getSmallestReadPoint(); + internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType, + smallestReadPoint, earliestPutTs); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Compaction scanner created for stats"); + } + InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName()); + if (scanner != null) { + internalScan = scanner; } } return internalScan; } - - @Override - public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException { - // Invoke collectStat here - HRegion region = ctx.getEnvironment().getRegion(); - TableName table = region.getRegionInfo().getTable(); - if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r, + HRegion region) { + try { if (familyMap != null) { familyMap.clear(); } @@ -307,14 +197,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess // TODO : Try making this atomic List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); - Configuration conf = ctx.getEnvironment().getConfiguration(); - if(LOG.isDebugEnabled()) { - LOG.debug("Collecting stats for the daughter region "+l.getRegionInfo()); + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo()); } collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime); clear(); - if(LOG.isDebugEnabled()) { - LOG.debug("Collecting stats for the daughter region "+r.getRegionInfo()); + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo()); } collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime); clear(); @@ -322,6 +211,9 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo()); } commitStats(mutations); + } catch (IOException e) { + LOG.error("Error while capturing stats after split of region " + + region.getRegionInfo().getRegionNameAsString(), e); } } @@ -345,7 +237,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess } deleteStatsFromStatsTable(parent, mutations, currentTime); } - writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime); + writeStatsToStatsTable(daughter, false, mutations, currentTime); } catch (IOException e) { LOG.error(e); throw e; @@ -363,13 +255,12 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess return scan; } - protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + protected InternalScanner getInternalScanner(HRegion region, Store store, InternalScanner internalScan, String family) { - return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan, + return new StatisticsScanner(this, statsTable, region.getRegionInfo(), internalScan, Bytes.toBytes(family)); } - @Override public void clear() { this.maxMap.clear(); this.minMap.clear(); @@ -377,7 +268,6 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess this.familyMap.clear(); } - @Override public void updateStatistic(KeyValue kv) { byte[] cf = kv.getFamily(); familyMap.put(new ImmutableBytesPtr(cf), true); @@ -415,19 +305,16 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess } } - @Override public byte[] getMaxKey(String fam) { if (maxMap.get(fam) != null) { return maxMap.get(fam); } return null; } - @Override public byte[] getMinKey(String fam) { if (minMap.get(fam) != null) { return minMap.get(fam); } return null; } - @Override public byte[] getGuidePosts(String fam) { if (!guidePostsMap.isEmpty()) { List<byte[]> guidePosts = guidePostsMap.get(fam); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java index 09174b2..86ffca7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java @@ -32,10 +32,10 @@ public class StatisticsScanner implements InternalScanner { private InternalScanner delegate; private StatisticsTable stats; private HRegionInfo region; - private StatisticsTracker tracker; + private StatisticsCollector tracker; private byte[] family; - public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region, + public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegionInfo region, InternalScanner delegate, byte[] family) { // should there be only one tracker? this.tracker = tracker; @@ -109,9 +109,6 @@ public class StatisticsScanner implements InternalScanner { delegate.close(); } catch (IOException e) { LOG.error("Error while closing the scanner"); - // TODO : We should throw the exception - /*if (toThrow == null) { throw e; } - throw MultipleIOException.createIOException(Lists.newArrayList(toThrow, e));*/ } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java index fcbbee9..e92d61e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java @@ -25,15 +25,11 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; @@ -46,38 +42,34 @@ public class StatisticsTable implements Closeable { /** Map of the currently open statistics tables */ private static final Map<String, StatisticsTable> tableMap = new HashMap<String, StatisticsTable>(); /** - * @param env - * Environment wherein the coprocessor is attempting to update the stats table. + * @param Configuration + * Configruation to update the stats table. * @param primaryTableName * name of the primary table on which we should collect stats * @return the {@link StatisticsTable} for the given primary table. * @throws IOException * if the table cannot be created due to an underlying HTable creation error */ - public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env, - byte[] primaryTableName) throws IOException { + public synchronized static StatisticsTable getStatisticsTableForCoprocessor(Configuration conf, + String primaryTableName) throws IOException { StatisticsTable table = tableMap.get(primaryTableName); if (table == null) { // Map the statics table and the table with which the statistics is // associated. This is a workaround - HTablePool pool = new HTablePool(env.getConfiguration(), 1); + HTablePool pool = new HTablePool(conf,1); + //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); - table = new StatisticsTable(hTable, primaryTableName); - tableMap.put(Bytes.toString(primaryTableName), table); + //h.setAutoFlushTo(true); + table = new StatisticsTable(hTable); + tableMap.put(primaryTableName, table); } return table; } private final HTableInterface statisticsTable; - private final byte[] sourceTableName; - private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) { + public StatisticsTable(HTableInterface statsTable) { this.statisticsTable = statsTable; - this.sourceTableName = sourceTableName; - } - - public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException { - this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName()); } /** @@ -104,7 +96,7 @@ public class StatisticsTable implements Closeable { * if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to * update */ - public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam, + public void addStats(String tableName, String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations, long currentTime) throws IOException { if (tracker == null) { return; } @@ -119,13 +111,15 @@ public class StatisticsTable implements Closeable { public void commitStats(List<Mutation> mutations) throws IOException { Object[] res = new Object[mutations.size()]; try { - statisticsTable.batch(mutations, res); + if (mutations.size() > 0) { + statisticsTable.batch(mutations, res); + } } catch (InterruptedException e) { throw new IOException("Exception while adding deletes and puts"); } } - private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations, + private void formStatsUpdateMutation(StatisticsCollector tracker, String fam, List<Mutation> mutations, long currentTime, byte[] prefix) { Put put = new Put(prefix, currentTime); if (tracker.getGuidePosts(fam) != null) { @@ -147,22 +141,11 @@ public class StatisticsTable implements Closeable { mutations.add(put); } - public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam, + public void deleteStats(String tableName, String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations, long currentTime) throws IOException { byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), PDataType.VARCHAR.toBytes(regionName)); mutations.add(new Delete(prefix, currentTime - 1)); } - - /** - * @return the underlying {@link HTableInterface} to which this table is writing - */ - HTableInterface getUnderlyingTable() { - return statisticsTable; - } - - byte[] getSourceTableName() { - return this.sourceTableName; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java deleted file mode 100644 index e1754f3..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.schema.stat; - -import org.apache.hadoop.hbase.KeyValue; - -/** - * Track a statistic for the column on a given region - */ -public interface StatisticsTracker { - - /** - * Reset the statistic after the completion of the compaction - */ - public void clear(); - - /** - * Update the current statistics with the next {@link KeyValue} to be written - * - * @param kv - * next {@link KeyValue} to be written. - */ - public void updateStatistic(KeyValue kv); - - /** - * Return the max key of the family - * @param fam - * @return - */ - public byte[] getMaxKey(String fam); - - /** - * Return the min key of the family - * - * @param fam - * @return - */ - public byte[] getMinKey(String fam); - - /** - * Return the guide posts of the family - * - * @param fam - * @return - */ - public byte[] getGuidePosts(String fam); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 42b20fe..daef1c3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -552,4 +552,8 @@ public class ScanUtil { return offset + slotPosition; } + + public static boolean isAnalyzeTable(Scan scan) { + return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index a65ca77..6e29c69 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -51,7 +51,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static final String DEFAULT_WAL_EDIT_CODEC = IndexedWALEditCodec.class.getName(); public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*4L; // 4 Mb public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*2L; // 2 Mb - public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 20; + public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 2000; public QueryServicesTestImpl(ReadOnlyProps defaultProps) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-protocol/src/main/StatisticsCollect.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/StatisticsCollect.proto b/phoenix-protocol/src/main/StatisticsCollect.proto deleted file mode 100644 index c80a756..0000000 --- a/phoenix-protocol/src/main/StatisticsCollect.proto +++ /dev/null @@ -1,20 +0,0 @@ -option java_package = "org.apache.phoenix.coprocessor.generated"; -option java_outer_classname = "StatCollectorProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - - -message StatCollectRequest { - optional bytes startRow = 1; - optional bytes stopRow = 2; -} - -message StatCollectResponse { - required uint64 rowsScanned = 1; -} - -service StatCollectService { - rpc collectStat(StatCollectRequest) - returns (StatCollectResponse); -} \ No newline at end of file