Repository: phoenix Updated Branches: refs/heads/3.0 d2cd2856b -> 84f634325
PHOENIX-1402 Don't recalculate stats on split Conflicts: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c9715571 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c9715571 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c9715571 Branch: refs/heads/3.0 Commit: c9715571a6bfb83c224459d0202a2e41e7cdde1a Parents: d2cd285 Author: James Taylor <jtay...@salesforce.com> Authored: Thu Nov 6 14:29:52 2014 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Nov 6 22:37:52 2014 -0800 ---------------------------------------------------------------------- dev/eclipse_prefs_phoenix.epf | 4 - .../phoenix/end2end/StatsCollectorIT.java | 103 ++++++++++++++++++ .../UngroupedAggregateRegionObserver.java | 44 ++++---- .../phoenix/iterate/ParallelIterators.java | 2 +- .../phoenix/schema/stats/GuidePostsInfo.java | 3 + .../schema/stats/StatisticsCollector.java | 105 ++++--------------- .../phoenix/schema/stats/StatisticsUtil.java | 11 ++ .../phoenix/schema/stats/StatisticsWriter.java | 70 ++++++++++++- .../org/apache/phoenix/util/UpgradeUtil.java | 18 +++- pom.xml | 4 +- 10 files changed, 239 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/dev/eclipse_prefs_phoenix.epf ---------------------------------------------------------------------- diff --git a/dev/eclipse_prefs_phoenix.epf b/dev/eclipse_prefs_phoenix.epf index e8c1a09..5a0d998 100644 --- a/dev/eclipse_prefs_phoenix.epf +++ b/dev/eclipse_prefs_phoenix.epf @@ -464,9 +464,7 @@ file_export_version=3.0 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.codeComplete.staticFieldPrefixes= /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.codeComplete.visibilityCheck=enabled /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled -/instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve -/instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.compliance=1.6 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.debug.lineNumber=generate /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.debug.localVariable=generate /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.debug.sourceFile=generate @@ -516,7 +514,6 @@ file_export_version=3.0 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=error /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=error -/instance/org.eclipse.jdt.core/org.eclipse.jdt.core.compiler.source=1.6 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=16 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16 @@ -775,7 +772,6 @@ file_export_version=3.0 /instance/org.eclipse.jdt.core/org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true /instance/org.eclipse.jdt.debug.ui/org.eclipse.debug.ui.ExpressionView.org.eclipse.jdt.debug.ui.show_null_entries=true /instance/org.eclipse.jdt.debug.ui/org.eclipse.debug.ui.VariableView.org.eclipse.jdt.debug.ui.show_null_entries=true -/instance/org.eclipse.jdt.launching/org.eclipse.jdt.launching.PREF_DEFAULT_ENVIRONMENTS_XML=<?xml version\="1.0" encoding\="UTF-8"?>\n<defaultEnvironments>\n<defaultEnvironment environmentId\="J2SE-1.5" vmId\="57,org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType13,1217539956414"/>\n<defaultEnvironment environmentId\="JavaSE-1.6" vmId\="57,org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType13,1217443741223"/>\n</defaultEnvironments>\n /instance/org.eclipse.jdt.ui/content_assist_disabled_computers=org.eclipse.jdt.ui.javaNoTypeProposalCategory\u0000org.eclipse.jdt.ui.textProposalCategory\u0000org.eclipse.jdt.ui.templateProposalCategory\u0000org.eclipse.jdt.ui.javaTypeProposalCategory\u0000 /instance/org.eclipse.jdt.ui/content_assist_lru_history=<?xml version\="1.0" encoding\="UTF-8"?><history maxLHS\="100" maxRHS\="10"/> /instance/org.eclipse.jdt.ui/content_assist_number_of_computers=17 http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java index b9a0e88..71eaf42 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -34,13 +35,16 @@ import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -50,12 +54,15 @@ import com.google.common.collect.Maps; @Category(NeedsOwnMiniClusterTest.class) public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { private static final String STATS_TEST_TABLE_NAME = "S"; + private static final byte[] STATS_TEST_TABLE_BYTES = Bytes.toBytes(STATS_TEST_TABLE_NAME); @BeforeClass public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); // Must update config before starting server props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(10)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -275,4 +282,100 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { assertEquals(nRows/2+1, keyRanges.size()); } + + + private void splitTable(Connection conn, byte[] splitPoint) throws IOException, InterruptedException, SQLException { + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + int nRegionsNow = services.getAllTableRegions(STATS_TEST_TABLE_BYTES).size(); + HBaseAdmin admin = services.getAdmin(); + try { + admin.split(STATS_TEST_TABLE_BYTES, splitPoint); + int nTries = 0; + int nRegions; + do { + Thread.sleep(2000); + services.clearTableRegionCache(STATS_TEST_TABLE_BYTES); + nRegions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES).size(); + nTries++; + } while (nRegions == nRegionsNow && nTries < 10); + if (nRegions == nRegionsNow) { + throw new IOException("Failed to complete split within alloted time"); + } + // FIXME: I see the commit of the stats finishing before this with a lower timestamp that the scan timestamp, + // yet without this sleep, the query finds the old data. Seems like an HBase bug and a potentially serious one. + Thread.sleep(5000); + } finally { + admin.close(); + } + } + + @Test + public void testSplitUpdatesStats() throws Exception { + int nRows = 10; + Connection conn; + PreparedStatement stmt; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE " + STATS_TEST_TABLE_NAME + "(k VARCHAR PRIMARY KEY, v INTEGER) " + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE); + stmt = conn.prepareStatement("UPSERT INTO " + STATS_TEST_TABLE_NAME + " VALUES(?,?)"); + for (int i = 0; i < nRows; i++) { + stmt.setString(1, Character.toString((char) ('a' + i))); + stmt.setInt(2, i); + stmt.executeUpdate(); + } + conn.commit(); + + TestUtil.analyzeTable(conn, STATS_TEST_TABLE_NAME); + List<KeyRange>keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME); + assertEquals(nRows+1, keyRanges.size()); + + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + List<HRegionLocation> regions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES); + assertEquals(1, regions.size()); + + ResultSet rs = conn.createStatement().executeQuery("SELECT GUIDE_POSTS_COUNT, REGION_NAME FROM SYSTEM.STATS WHERE PHYSICAL_NAME='"+STATS_TEST_TABLE_NAME+"' AND REGION_NAME IS NOT NULL"); + assertTrue(rs.next()); + assertEquals(nRows, rs.getLong(1)); + assertEquals(regions.get(0).getRegionInfo().getRegionNameAsString(), rs.getString(2)); + assertFalse(rs.next()); + + byte[] midPoint = Bytes.toBytes(Character.toString((char) ('a' + (nRows/2)))); + splitTable(conn, midPoint); + + keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME); + assertEquals(nRows+1, keyRanges.size()); // Same number as before because split was at guidepost + + regions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES); + assertEquals(2, regions.size()); + rs = conn.createStatement().executeQuery("SELECT GUIDE_POSTS_COUNT, REGION_NAME FROM SYSTEM.STATS WHERE PHYSICAL_NAME='"+STATS_TEST_TABLE_NAME+"' AND REGION_NAME IS NOT NULL"); + assertTrue(rs.next()); + assertEquals(nRows/2, rs.getLong(1)); + assertEquals(regions.get(0).getRegionInfo().getRegionNameAsString(), rs.getString(2)); + assertTrue(rs.next()); + assertEquals(nRows/2 - 1, rs.getLong(1)); + assertEquals(regions.get(1).getRegionInfo().getRegionNameAsString(), rs.getString(2)); + assertFalse(rs.next()); + + byte[] midPoint2 = Bytes.toBytes("cj"); + splitTable(conn, midPoint2); + + keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME); + assertEquals(nRows+2, keyRanges.size()); // One extra due to split between guideposts + + regions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES); + assertEquals(3, regions.size()); + rs = conn.createStatement().executeQuery("SELECT GUIDE_POSTS_COUNT, REGION_NAME FROM SYSTEM.STATS WHERE PHYSICAL_NAME='"+STATS_TEST_TABLE_NAME+"' AND REGION_NAME IS NOT NULL"); + assertTrue(rs.next()); + assertEquals(3, rs.getLong(1)); + assertEquals(regions.get(0).getRegionInfo().getRegionNameAsString(), rs.getString(2)); + assertTrue(rs.next()); + assertEquals(2, rs.getLong(1)); + assertEquals(regions.get(1).getRegionInfo().getRegionNameAsString(), rs.getString(2)); + assertTrue(rs.next()); + assertEquals(nRows/2 - 1, rs.getLong(1)); + assertEquals(regions.get(2).getRegionInfo().getRegionNameAsString(), rs.getString(2)); + assertFalse(rs.next()); + + conn.close(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 7d7159b..4cf816a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -64,7 +64,6 @@ import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -194,8 +193,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver boolean hasMore; boolean hasAny = false; MultiKeyValueTuple result = new MultiKeyValueTuple(); - if (logger.isInfoEnabled()) { - logger.info("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo()); + if (logger.isDebugEnabled()) { + logger.debug("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo()); } long rowCount = 0; MultiVersionConsistencyControl.setThreadReadPoint(innerScanner.getMvccReadPoint()); @@ -320,8 +319,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - if (logger.isInfoEnabled()) { - logger.info("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan); + if (logger.isDebugEnabled()) { + logger.debug("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan); } if (!mutations.isEmpty()) { @@ -458,27 +457,24 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver throws IOException { HRegion region = e.getEnvironment().getRegion(); String table = region.getRegionInfo().getTableNameAsString(); - if (!table.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { - StatisticsCollector stats = null; - try { - boolean useCurrentTime = - e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); - // Provides a means of clients controlling their timestamps to not use current time - // when background tasks are updating stats. Instead we track the max timestamp of - // the cells and use that. - long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP; - stats = new StatisticsCollector(e.getEnvironment(), table, clientTimeStamp); - stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region); - } catch (IOException ioe) { - if(logger.isDebugEnabled()) { - logger.debug("Error while collecting stats during split ",ioe); - } - } finally { - if (stats != null) stats.close(); + StatisticsCollector stats = null; + try { + boolean useCurrentTime = + e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); + // Provides a means of clients controlling their timestamps to not use current time + // when background tasks are updating stats. Instead we track the max timestamp of + // the cells and use that. + long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP; + stats = new StatisticsCollector(e.getEnvironment(), table, clientTimeStamp); + stats.splitStats(region, l, r); + } catch (IOException ioe) { + if(logger.isWarnEnabled()) { + logger.warn("Error while collecting stats during split for " + table,ioe); } + } finally { + if (stats != null) stats.close(); } - } public static byte[] serialize(List<Expression> selectExpressions) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index f5c4027..42890bb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -345,7 +345,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { for (int i = 0; i < gps.size(); i++) { buf.append(Bytes.toStringBinary(gps.get(i))); buf.append(","); - if (i+1 < gps.size() && ((i+1) % 10) == 0) { + if (i > 0 && i < gps.size()-1 && (i % 10) == 0) { buf.append("\n"); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java index 6484349..b4aa0b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java @@ -4,6 +4,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.util.Bytes; @@ -34,6 +35,8 @@ import com.google.common.collect.Lists; * A simple POJO class that holds the information related to GuidePosts serDe. */ public class GuidePostsInfo { + public static final GuidePostsInfo EMPTY_GUIDE_POSTS_INFO = new GuidePostsInfo(0L, Collections.<byte[]>emptyList()); + private long byteCount; // Number of bytes traversed in the region private long keyByteSize; // Total number of bytes in keys stored in guidePosts private List<byte[]> guidePosts; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index a90c095..61fb7da 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -19,12 +19,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -57,7 +55,6 @@ public class StatisticsCollector { private Map<String, byte[]> minMap = Maps.newHashMap(); private Map<String, byte[]> maxMap = Maps.newHashMap(); private long guidepostDepth; - private boolean useCurrentTime; private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; private Map<String, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap(); // Tracks the bytecount per family if it has reached the guidePostsDepth @@ -68,9 +65,6 @@ public class StatisticsCollector { throws IOException { Configuration config = env.getConfiguration(); HTableInterface statsHTable = env.getTable((PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES)); - useCurrentTime = - config.getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); int guidepostPerRegion = config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, 0); if (guidepostPerRegion > 0) { long maxFileSize = statsHTable.getTableDescriptor().getMaxFileSize(); @@ -139,36 +133,6 @@ public class StatisticsCollector { statsTable.commitStats(mutations); } - private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) - throws IOException { - try { - String regionName = region.getRegionInfo().getRegionNameAsString(); - // update the statistics table - for (ImmutableBytesPtr fam : familyMap.keySet()) { - statsTable.deleteStats(regionName, this, Bytes.toString(fam.copyBytesIfNecessary()), - mutations); - } - } catch (IOException e) { - logger.error("Failed to delete from statistics table!", e); - throw e; - } - } - - private int scanRegion(RegionScanner scanner, int count) throws IOException { - List<KeyValue> results = new ArrayList<KeyValue>(); - boolean hasMore = true; - while (hasMore) { - hasMore = scanner.next(results); - collectStatistics(results); - count += results.size(); - results.clear(); - while (!hasMore) { - break; - } - } - return count; - } - public void collectStatistics(final List<KeyValue> results) { for (KeyValue kv : results) { updateStatistic(kv); @@ -183,67 +147,24 @@ public class StatisticsCollector { return getInternalScanner(region, store, s, store.getColumnFamilyName()); } - - public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r, HRegion region) - throws IOException { - // Invoke collectStat here + public void splitStats(HRegion parent, HRegion left, HRegion right) { try { - // Create a delete operation on the parent region - // Then write the new guide posts for individual regions - List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); - - long currentTime = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : -1; - deleteStatsFromStatsTable(region, mutations, currentTime); if (logger.isDebugEnabled()) { - logger.debug("Collecting stats for the daughter region " + l.getRegionInfo()); + logger.debug("Collecting stats for split of " + parent.getRegionInfo() + " into " + left.getRegionInfo() + " and " + right.getRegionInfo()); } - collectStatsForSplitRegions(conf, l, mutations, currentTime); - if (logger.isDebugEnabled()) { - logger.debug("Collecting stats for the daughter region " + r.getRegionInfo()); + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); + for (byte[] fam : parent.getStores().keySet()) { + statsTable.splitStats(parent, left, right, this, Bytes.toString(fam), mutations); } - collectStatsForSplitRegions(conf, r, mutations, currentTime); if (logger.isDebugEnabled()) { - logger.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo()); + logger.debug("Committing stats for the daughter regions as part of split " + parent.getRegionInfo()); } + statsTable.commitStats(mutations); } catch (IOException e) { logger.error("Error while capturing stats after split of region " - + region.getRegionInfo().getRegionNameAsString(), e); - } - } - - private void collectStatsForSplitRegions(Configuration conf, HRegion daughter, - List<Mutation> mutations, long currentTime) throws IOException { - IOException toThrow = null; - clear(); - Scan scan = createScan(conf); - RegionScanner scanner = null; - int count = 0; - try { - scanner = daughter.getScanner(scan); - count = scanRegion(scanner, count); - writeStatsToStatsTable(daughter, false, mutations, currentTime); - } catch (IOException e) { - logger.error("Unable to collects stats during split", e); - toThrow = e; - } finally { - try { - if (scanner != null) scanner.close(); - } catch (IOException e) { - logger.error("Unable to close scanner after split", e); - if (toThrow != null) toThrow = e; - } finally { - if (toThrow != null) throw toThrow; - } + + parent.getRegionInfo().getRegionNameAsString(), e); } } - - private Scan createScan(Configuration conf) { - Scan scan = new Scan(); - scan.setCaching(conf.getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE)); - // do not cache the blocks here - scan.setCacheBlocks(false); - return scan; - } protected InternalScanner getInternalScanner(HRegion region, Store store, InternalScanner internalScan, String family) { @@ -259,6 +180,16 @@ public class StatisticsCollector { maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; } + public void addGuidePost(String fam, GuidePostsInfo info, long byteSize, long timestamp) { + Pair<Long,GuidePostsInfo> newInfo = new Pair<Long,GuidePostsInfo>(byteSize,info); + Pair<Long,GuidePostsInfo> oldInfo = guidePostsMap.put(fam, newInfo); + if (oldInfo != null) { + info.combine(oldInfo.getSecond()); + newInfo.setFirst(oldInfo.getFirst() + newInfo.getFirst()); + } + maxTimeStamp = Math.max(maxTimeStamp, timestamp); + } + public void updateStatistic(KeyValue kv) { byte[] cf = kv.getFamily(); familyMap.put(new ImmutableBytesPtr(cf), true); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index a48b04a..c47ad24 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -58,6 +59,16 @@ public class StatisticsUtil { return rowKey; } + public static Result readRegionStatistics(HTableInterface statsHTable, byte[] tableNameBytes, byte[] cf, byte[] regionName, long clientTimeStamp) + throws IOException { + byte[] prefix = StatisticsUtil.getRowKey(tableNameBytes, cf, regionName); + Get get = new Get(prefix); + get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp); + get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES); + get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); + return statsHTable.get(get); + } + public static PTableStats readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, long clientTimeStamp) throws IOException { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); Scan s = MetaDataUtil.newTableRowsScan(tableNameBytes, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index 4118bb9..e28f805 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -20,17 +20,23 @@ package org.apache.phoenix.schema.stats; import java.io.Closeable; import java.io.IOException; import java.sql.Date; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationProtocol; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.TimeKeeper; @@ -68,6 +74,56 @@ public class StatisticsWriter implements Closeable { statisticsTable.close(); } + public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { + if (tracker == null) { return; } + boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP; + if (!useMaxTimeStamp) { + mutations.add(getLastStatsUpdatedTimePut(clientTimeStamp)); + } + long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp; + byte[] famBytes = PDataType.VARCHAR.toBytes(fam); + Result result = StatisticsUtil.readRegionStatistics(statisticsTable, tableName, famBytes, p.getRegionName(), readTimeStamp); + if (result != null && !result.isEmpty()) { + KeyValue cell = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); + + if (cell != null) { + long writeTimeStamp = useMaxTimeStamp ? cell.getTimestamp() : clientTimeStamp; + + GuidePostsInfo guidePosts = GuidePostsInfo.fromBytes(cell.getBuffer(), cell.getValueOffset(), cell.getValueLength()); + byte[] pPrefix = StatisticsUtil.getRowKey(tableName, famBytes, p.getRegionName()); + mutations.add(new Delete(pPrefix, writeTimeStamp)); + + long byteSize = 0; + KeyValue byteSizeCell = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES); + if (byteSizeCell != null) { + byteSize = PDataType.LONG.getCodec().decodeLong(byteSizeCell.getBuffer(), byteSizeCell.getValueOffset(), SortOrder.getDefault()) / 2; + } + int midEndIndex, midStartIndex; + int index = Collections.binarySearch(guidePosts.getGuidePosts(), r.getStartKey(), Bytes.BYTES_COMPARATOR); + if (index < 0) { + midEndIndex = midStartIndex = -(index + 1); + } else { + // For an exact match, we want to get rid of the exact match guidepost, + // since it's replaced by the region boundary. + midEndIndex = index; + midStartIndex = index + 1; + } + if (midEndIndex > 0) { + GuidePostsInfo lguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(0, midEndIndex)); + tracker.clear(); + tracker.addGuidePost(fam, lguidePosts, byteSize, cell.getTimestamp()); + addStats(l.getRegionNameAsString(), tracker, fam, mutations); + } + if (midStartIndex < guidePosts.getGuidePosts().size()) { + GuidePostsInfo rguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(midStartIndex, guidePosts.getGuidePosts().size())); + tracker.clear(); + tracker.addGuidePost(fam, rguidePosts, byteSize, cell.getTimestamp()); + addStats(r.getRegionNameAsString(), tracker, fam, mutations); + } + } + } + } + /** * Update a list of statistics for a given region. If the ANALYZE <tablename> query is issued * then we use Upsert queries to update the table @@ -103,10 +159,16 @@ public class StatisticsWriter implements Closeable { put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES, timeStamp, PDataType.LONG.toBytes(gp.getByteCount())); } - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, - timeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, - timeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); + byte[] minKey = tracker.getMinKey(fam); + if (minKey != null) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, + timeStamp, PDataType.VARBINARY.toBytes(minKey)); + } + byte[] maxKey = tracker.getMaxKey(fam); + if (maxKey != null) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, + timeStamp, PDataType.VARBINARY.toBytes(maxKey)); + } // Add our empty column value so queries behave correctly put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index a61f39a..12d3938 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -55,6 +55,7 @@ public class UpgradeUtil { private static void preSplitSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException { HBaseAdmin admin = conn.getQueryServices().getAdmin(); + boolean snapshotCreated = false; try { if (nSaltBuckets <= 0) { return; @@ -62,6 +63,7 @@ public class UpgradeUtil { logger.warn("Pre-splitting SYSTEM.SEQUENCE table " + nSaltBuckets + "-ways. This may take some time - please do not close window."); HTableDescriptor desc = admin.getTableDescriptor(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); admin.snapshot(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME); + snapshotCreated = true; admin.disableTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME); admin.deleteTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME); byte[][] splitPoints = SaltingUtil.getSalteByteSplitPoints(nSaltBuckets); @@ -74,9 +76,19 @@ public class UpgradeUtil { throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e); } finally { try { - admin.close(); - } catch (IOException e) { - logger.warn("Exception while closing admin during pre-split", e); + if (snapshotCreated) { + try { + admin.deleteSnapshot(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME); + } catch (IOException e) { + logger.warn("Exception while deleting SYSTEM.SEQUENCE snapshot during pre-split", e); + } + } + } finally { + try { + admin.close(); + } catch (IOException e) { + logger.warn("Exception while closing admin during pre-split", e); + } } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c9715571/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index dfce72f..141b61a 100644 --- a/pom.xml +++ b/pom.xml @@ -129,8 +129,8 @@ <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> - <source>1.6</source> - <target>1.6</target> + <source>1.7</source> + <target>1.7</target> </configuration> </plugin> <!--This plugin's configuration is used to store Eclipse m2e settings