PHOENIX-4544 Update statistics inconsistent behavior

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1b18d347
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1b18d347
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1b18d347

Branch: refs/heads/master
Commit: 1b18d3474d2e3deca429374dac60062b48fe1592
Parents: 5aebc96
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Thu Jun 7 11:01:14 2018 -0700
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Thu Jun 7 11:01:14 2018 -0700

----------------------------------------------------------------------
 .../StatisticsCollectionRunTrackerIT.java       | 32 +++++++++-----
 .../UngroupedAggregateRegionObserver.java       |  7 +--
 .../apache/phoenix/schema/MetaDataClient.java   |  8 ++--
 .../stats/StatisticsCollectionRunTracker.java   | 46 +++++++++++++++++---
 .../java/org/apache/phoenix/util/ByteUtil.java  | 16 ++++++-
 5 files changed, 83 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
index 71c9e01..cdf1fde 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
@@ -25,12 +25,15 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
@@ -60,12 +63,15 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         StatisticsCollectionRunTracker tracker =
                 StatisticsCollectionRunTracker.getInstance(new 
Configuration());
         // assert that the region wasn't added to the tracker
-        assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo));
+        assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")))));
+        assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0")))));
         // assert that removing the region from the tracker works
-        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")))));
+        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0")))));
         runUpdateStats(tableName);
         // assert that after update stats is complete, tracker isn't tracking 
the region any more
-        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")))));
+        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo, new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("L#0")))));;
     }
     
     @Test
@@ -102,25 +108,28 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         RegionInfo regionInfo = createTableAndGetRegion(tableName);
         // simulate stats collection via major compaction by marking the 
region as compacting in the tracker
         markRegionAsCompacting(regionInfo);
-        Assert.assertEquals("Row count didn't match", 
COMPACTION_UPDATE_STATS_ROW_COUNT, runUpdateStats(tableName));
+        //there will be no update for local index and a table , so checking 2 
* COMPACTION_UPDATE_STATS_ROW_COUNT
+        Assert.assertEquals("Row count didn't match", 
COMPACTION_UPDATE_STATS_ROW_COUNT * 2, runUpdateStats(tableName));
         StatisticsCollectionRunTracker tracker =
                 StatisticsCollectionRunTracker.getInstance(new 
Configuration());
         // assert that the tracker state was cleared.
-        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        HashSet<byte[]> familyMap = new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")));
+        
assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo,familyMap));
     }
     
     @Test
     public void testUpdateStatsPreventsAnotherUpdateStatsFromRunning() throws 
Exception {
         String tableName = fullTableName;
         RegionInfo regionInfo = createTableAndGetRegion(tableName);
-        markRunningUpdateStats(regionInfo);
-        Assert.assertEquals("Row count didn't match", 
CONCURRENT_UPDATE_STATS_ROW_COUNT,
-            runUpdateStats(tableName));
+        HashSet<byte[]> familyMap = new 
HashSet<byte[]>(Arrays.asList(Bytes.toBytes("0")));
+        markRunningUpdateStats(regionInfo,familyMap);
+        //there will be no update for a table but local index should succeed, 
so checking 2 * COMPACTION_UPDATE_STATS_ROW_COUNT
+        assertTrue("Local index stats are not updated!", 
CONCURRENT_UPDATE_STATS_ROW_COUNT < runUpdateStats(tableName));
         
         // assert that running the concurrent and race-losing update stats 
didn't clear the region
         // from the tracker. If the method returned true it means the tracker 
was still tracking
         // the region. Slightly counter-intuitive, yes.
-        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        
assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo,familyMap));
     }
     
     private void markRegionAsCompacting(RegionInfo regionInfo) {
@@ -129,10 +138,10 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         tracker.addCompactingRegion(regionInfo);
     }
 
-    private void markRunningUpdateStats(RegionInfo regionInfo) {
+    private void markRunningUpdateStats(RegionInfo regionInfo, HashSet<byte[]> 
familyMap) {
         StatisticsCollectionRunTracker tracker =
                 StatisticsCollectionRunTracker.getInstance(new 
Configuration());
-        tracker.addUpdateStatsCommandRegion(regionInfo);
+        tracker.addUpdateStatsCommandRegion(regionInfo,familyMap);
     }
 
     private RegionInfo createTableAndGetRegion(String tableName) throws 
Exception {
@@ -140,6 +149,7 @@ public class StatisticsCollectionRunTrackerIT extends 
ParallelStatsEnabledIT {
         String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR PRIMARY KEY, 
KV1 VARCHAR)";
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute(ddl);
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_IDX ON " + tableName + "(KV1)");
             PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
             try (Admin admin = phxConn.getQueryServices().getAdmin()) {
                 List<RegionInfo> tableRegions = admin.getRegions(tn);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 7fad5cb..e4fc149 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -72,13 +71,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanOptions;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.security.User;
@@ -1148,7 +1145,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         long rowCount = 0; // in case of async, we report 0 as number of rows 
updated
         StatisticsCollectionRunTracker statsRunTracker =
                 StatisticsCollectionRunTracker.getInstance(config);
-        boolean runUpdateStats = 
statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo());
+        boolean runUpdateStats = 
statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
         if (runUpdateStats) {
             if (!async) {
                 rowCount = callable.call();
@@ -1267,7 +1264,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     }
                 } finally {
                     try {
-                        
StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo());
+                        
StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
                         statsCollector.close();
                     } finally {
                         try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index b8cab79..0970f5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -115,7 +115,6 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -136,7 +135,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
@@ -1170,7 +1168,11 @@ public class MetaDataClient {
                 // If the table is a view, then we will end up calling update 
stats
                 // here for all the view indexes on it. We take care of local 
indexes later.
                 if (index.getIndexType() != IndexType.LOCAL) {
-                    rowCount += 
updateStatisticsInternal(table.getPhysicalName(), index, 
updateStatisticsStmt.getProps(), true);
+                    if(table.getType() != PTableType.VIEW){
+                        rowCount += 
updateStatisticsInternal(index.getPhysicalName(), index, 
updateStatisticsStmt.getProps(), true);
+                    }else{
+                        rowCount += 
updateStatisticsInternal(table.getPhysicalName(), index, 
updateStatisticsStmt.getProps(), true);
+                    }
                 }
             }
             /*

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
index 1a928db..a982c85 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ByteUtil;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -38,8 +39,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class StatisticsCollectionRunTracker {
     private static volatile StatisticsCollectionRunTracker INSTANCE;
-    private final Set<RegionInfo> updateStatsRegions = Collections
-            .newSetFromMap(new ConcurrentHashMap<RegionInfo, Boolean>());
+    private final Set<ColumnFamilyRegionInfo> updateStatsRegions = Collections
+            .newSetFromMap(new ConcurrentHashMap<ColumnFamilyRegionInfo, 
Boolean>());
     private final Set<RegionInfo> compactingRegions = Collections
             .newSetFromMap(new ConcurrentHashMap<RegionInfo, Boolean>());
     private final ExecutorService executor;
@@ -101,18 +102,19 @@ public class StatisticsCollectionRunTracker {
 
     /**
      * @param regionInfo for the region to run UPDATE STATISTICS command on.
+     * @param familySet 
      * @return true if UPDATE STATISTICS wasn't already running on the region, 
false otherwise.
      */
-    public boolean addUpdateStatsCommandRegion(RegionInfo regionInfo) {
-        return updateStatsRegions.add(regionInfo);
+    public boolean addUpdateStatsCommandRegion(RegionInfo regionInfo, 
Set<byte[]> familySet) {
+        return updateStatsRegions.add(new 
ColumnFamilyRegionInfo(regionInfo,familySet));
     }
 
     /**
      * @param regionInfo for the region to mark as not running UPDATE 
STATISTICS command on.
      * @return true if UPDATE STATISTICS was running on the region, false 
otherwise.
      */
-    public boolean removeUpdateStatsCommandRegion(RegionInfo regionInfo) {
-        return updateStatsRegions.remove(regionInfo);
+    public boolean removeUpdateStatsCommandRegion(RegionInfo regionInfo, 
Set<byte[]> familySet) {
+        return updateStatsRegions.remove(new 
ColumnFamilyRegionInfo(regionInfo,familySet));
     }
 
     /**
@@ -124,4 +126,36 @@ public class StatisticsCollectionRunTracker {
         return executor.submit(c);
     }
 
+    class ColumnFamilyRegionInfo {
+        private RegionInfo regionInfo;
+        private Set<byte[]> familySet;
+
+        public ColumnFamilyRegionInfo(RegionInfo regionInfo, Set<byte[]> 
familySet) {
+            this.regionInfo = regionInfo;
+            this.familySet = familySet;
+        }
+
+        public RegionInfo getRegionInfo() {
+            return regionInfo;
+        }
+
+        public Set<byte[]> getFamilySet() {
+            return familySet;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) { return true; }
+            if (!(obj instanceof ColumnFamilyRegionInfo)) { return false; }
+
+            ColumnFamilyRegionInfo c = (ColumnFamilyRegionInfo)obj;
+            return c.getRegionInfo().equals(this.regionInfo) && 
ByteUtil.match(this.familySet, c.getFamilySet());
+        }
+
+        @Override
+        public int hashCode() {
+            return this.getRegionInfo().hashCode();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1b18d347/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index d11f3a2..5a2b624 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -24,8 +24,10 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -579,7 +581,7 @@ public class ByteUtil {
         }
     }
     
-    public static boolean contains(List<byte[]> keys, byte[] key) {
+    public static boolean contains(Collection<byte[]> keys, byte[] key) {
         for (byte[] k : keys) {
             if (Arrays.equals(k, key)) { return true; }
         }
@@ -592,4 +594,16 @@ public class ByteUtil {
         }
         return false;
     }
+
+    public static boolean match(Set<byte[]> keys, Set<byte[]> keys2) {
+        if (keys == keys2) return true;
+        if (keys == null || keys2 == null) return false;
+
+        int size = keys.size();
+        if (keys2.size() != size) return false;
+        for (byte[] k : keys) {
+            if (!contains(keys2, k)) { return false; }
+        }
+        return true;
+    }
 }

Reply via email to