PHOENIX-4544 Update statistics inconsistent behavior
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1b18d347 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1b18d347 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1b18d347 Branch: refs/heads/master Commit: 1b18d3474d2e3deca429374dac60062b48fe1592 Parents: 5aebc96 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Thu Jun 7 11:01:14 2018 -0700 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Thu Jun 7 11:01:14 2018 -0700 ---------------------------------------------------------------------- .../StatisticsCollectionRunTrackerIT.java | 32 +++++++++----- .../UngroupedAggregateRegionObserver.java | 7 +-- .../apache/phoenix/schema/MetaDataClient.java | 8 ++-- .../stats/StatisticsCollectionRunTracker.java | 46 +++++++++++++++++--- .../java/org/apache/phoenix/util/ByteUtil.java | 16 ++++++- 5 files changed, 83 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java index 71c9e01..cdf1fde 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java @@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsEnabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; @@ -60,12 +63,15 @@ public class StatisticsCollectionRunTrackerIT extends ParallelStatsEnabledIT { StatisticsCollectionRunTracker tracker = StatisticsCollectionRunTracker.getInstance(new Configuration()); // assert that the region wasn't added to the tracker - assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo)); + assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo, new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0"))))); + assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo, new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0"))))); // assert that removing the region from the tracker works - assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo)); + assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo, new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0"))))); + assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo, new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0"))))); runUpdateStats(tableName); // assert that after update stats is complete, tracker isn't tracking the region any more - assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo)); + assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo, new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0"))))); + assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo, new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0")))));; } @Test @@ -102,25 +108,28 @@ public class StatisticsCollectionRunTrackerIT extends ParallelStatsEnabledIT { RegionInfo regionInfo = createTableAndGetRegion(tableName); // simulate stats collection via major compaction by marking the region as compacting in the tracker markRegionAsCompacting(regionInfo); - Assert.assertEquals("Row count didn't match", COMPACTION_UPDATE_STATS_ROW_COUNT, runUpdateStats(tableName)); + //there will be no update for local index and a table , so checking 2 * COMPACTION_UPDATE_STATS_ROW_COUNT + Assert.assertEquals("Row count didn't match", COMPACTION_UPDATE_STATS_ROW_COUNT * 2, runUpdateStats(tableName)); StatisticsCollectionRunTracker tracker = StatisticsCollectionRunTracker.getInstance(new Configuration()); // assert that the tracker state was cleared. - assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo)); + HashSet<byte[]> familyMap = new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0"))); + assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo,familyMap)); } @Test public void testUpdateStatsPreventsAnotherUpdateStatsFromRunning() throws Exception { String tableName = fullTableName; RegionInfo regionInfo = createTableAndGetRegion(tableName); - markRunningUpdateStats(regionInfo); - Assert.assertEquals("Row count didn't match", CONCURRENT_UPDATE_STATS_ROW_COUNT, - runUpdateStats(tableName)); + HashSet<byte[]> familyMap = new HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0"))); + markRunningUpdateStats(regionInfo,familyMap); + //there will be no update for a table but local index should succeed, so checking 2 * COMPACTION_UPDATE_STATS_ROW_COUNT + assertTrue("Local index stats are not updated!", CONCURRENT_UPDATE_STATS_ROW_COUNT < runUpdateStats(tableName)); // assert that running the concurrent and race-losing update stats didn't clear the region // from the tracker. If the method returned true it means the tracker was still tracking // the region. Slightly counter-intuitive, yes. - assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo)); + assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo,familyMap)); } private void markRegionAsCompacting(RegionInfo regionInfo) { @@ -129,10 +138,10 @@ public class StatisticsCollectionRunTrackerIT extends ParallelStatsEnabledIT { tracker.addCompactingRegion(regionInfo); } - private void markRunningUpdateStats(RegionInfo regionInfo) { + private void markRunningUpdateStats(RegionInfo regionInfo, HashSet<byte[]> familyMap) { StatisticsCollectionRunTracker tracker = StatisticsCollectionRunTracker.getInstance(new Configuration()); - tracker.addUpdateStatsCommandRegion(regionInfo); + tracker.addUpdateStatsCommandRegion(regionInfo,familyMap); } private RegionInfo createTableAndGetRegion(String tableName) throws Exception { @@ -140,6 +149,7 @@ public class StatisticsCollectionRunTrackerIT extends ParallelStatsEnabledIT { String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR PRIMARY KEY, KV1 VARCHAR)"; try (Connection conn = DriverManager.getConnection(getUrl())) { conn.createStatement().execute(ddl); + conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_IDX ON " + tableName + "(KV1)"); PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); try (Admin admin = phxConn.getQueryServices().getAdmin()) { List<RegionInfo> tableRegions = admin.getRegions(tn); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 7fad5cb..e4fc149 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -72,13 +71,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanOptions; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.security.User; @@ -1148,7 +1145,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver long rowCount = 0; // in case of async, we report 0 as number of rows updated StatisticsCollectionRunTracker statsRunTracker = StatisticsCollectionRunTracker.getInstance(config); - boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo()); + boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet()); if (runUpdateStats) { if (!async) { rowCount = callable.call(); @@ -1267,7 +1264,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } finally { try { - StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo()); + StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet()); statsCollector.close(); } finally { try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index b8cab79..0970f5e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -115,7 +115,6 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.Types; import java.util.ArrayList; -import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -136,7 +135,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; @@ -1170,7 +1168,11 @@ public class MetaDataClient { // If the table is a view, then we will end up calling update stats // here for all the view indexes on it. We take care of local indexes later. if (index.getIndexType() != IndexType.LOCAL) { - rowCount += updateStatisticsInternal(table.getPhysicalName(), index, updateStatisticsStmt.getProps(), true); + if(table.getType() != PTableType.VIEW){ + rowCount += updateStatisticsInternal(index.getPhysicalName(), index, updateStatisticsStmt.getProps(), true); + }else{ + rowCount += updateStatisticsInternal(table.getPhysicalName(), index, updateStatisticsStmt.getProps(), true); + } } } /* http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java index 1a928db..a982c85 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ByteUtil; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -38,8 +39,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ public class StatisticsCollectionRunTracker { private static volatile StatisticsCollectionRunTracker INSTANCE; - private final Set<RegionInfo> updateStatsRegions = Collections - .newSetFromMap(new ConcurrentHashMap<RegionInfo, Boolean>()); + private final Set<ColumnFamilyRegionInfo> updateStatsRegions = Collections + .newSetFromMap(new ConcurrentHashMap<ColumnFamilyRegionInfo, Boolean>()); private final Set<RegionInfo> compactingRegions = Collections .newSetFromMap(new ConcurrentHashMap<RegionInfo, Boolean>()); private final ExecutorService executor; @@ -101,18 +102,19 @@ public class StatisticsCollectionRunTracker { /** * @param regionInfo for the region to run UPDATE STATISTICS command on. + * @param familySet * @return true if UPDATE STATISTICS wasn't already running on the region, false otherwise. */ - public boolean addUpdateStatsCommandRegion(RegionInfo regionInfo) { - return updateStatsRegions.add(regionInfo); + public boolean addUpdateStatsCommandRegion(RegionInfo regionInfo, Set<byte[]> familySet) { + return updateStatsRegions.add(new ColumnFamilyRegionInfo(regionInfo,familySet)); } /** * @param regionInfo for the region to mark as not running UPDATE STATISTICS command on. * @return true if UPDATE STATISTICS was running on the region, false otherwise. */ - public boolean removeUpdateStatsCommandRegion(RegionInfo regionInfo) { - return updateStatsRegions.remove(regionInfo); + public boolean removeUpdateStatsCommandRegion(RegionInfo regionInfo, Set<byte[]> familySet) { + return updateStatsRegions.remove(new ColumnFamilyRegionInfo(regionInfo,familySet)); } /** @@ -124,4 +126,36 @@ public class StatisticsCollectionRunTracker { return executor.submit(c); } + class ColumnFamilyRegionInfo { + private RegionInfo regionInfo; + private Set<byte[]> familySet; + + public ColumnFamilyRegionInfo(RegionInfo regionInfo, Set<byte[]> familySet) { + this.regionInfo = regionInfo; + this.familySet = familySet; + } + + public RegionInfo getRegionInfo() { + return regionInfo; + } + + public Set<byte[]> getFamilySet() { + return familySet; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { return true; } + if (!(obj instanceof ColumnFamilyRegionInfo)) { return false; } + + ColumnFamilyRegionInfo c = (ColumnFamilyRegionInfo)obj; + return c.getRegionInfo().equals(this.regionInfo) && ByteUtil.match(this.familySet, c.getFamilySet()); + } + + @Override + public int hashCode() { + return this.getRegionInfo().hashCode(); + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java index d11f3a2..5a2b624 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java @@ -24,8 +24,10 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -579,7 +581,7 @@ public class ByteUtil { } } - public static boolean contains(List<byte[]> keys, byte[] key) { + public static boolean contains(Collection<byte[]> keys, byte[] key) { for (byte[] k : keys) { if (Arrays.equals(k, key)) { return true; } } @@ -592,4 +594,16 @@ public class ByteUtil { } return false; } + + public static boolean match(Set<byte[]> keys, Set<byte[]> keys2) { + if (keys == keys2) return true; + if (keys == null || keys2 == null) return false; + + int size = keys.size(); + if (keys2.size() != size) return false; + for (byte[] k : keys) { + if (!contains(keys2, k)) { return false; } + } + return true; + } }