Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 82a4c0e02 -> d81ad6a5f


PHOENIX-3496 Figure out why LocalIndexIT#testLocalIndexRoundTrip is 
flapping(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d81ad6a5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d81ad6a5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d81ad6a5

Branch: refs/heads/4.x-HBase-0.98
Commit: d81ad6a5fbaa68418dc67d24c6003e18b8543f69
Parents: 82a4c0e
Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Authored: Wed Sep 6 00:09:15 2017 +0530
Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Committed: Wed Sep 6 00:09:15 2017 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/FlappingLocalIndexIT.java   | 79 +++++++++++++++++++-
 .../phoenix/end2end/index/BaseLocalIndexIT.java |  6 +-
 .../phoenix/end2end/index/LocalIndexIT.java     |  3 +-
 .../UngroupedAggregateRegionObserver.java       | 42 ++++++++++-
 .../phoenix/iterate/BaseResultIterators.java    | 60 ++++++++++++---
 5 files changed, 170 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81ad6a5/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
index 7509997..e2f3970 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
@@ -21,22 +21,31 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.end2end.index.BaseLocalIndexIT;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -297,4 +306,72 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT 
{
         indexTable.close();
     }
 
-}
+    @Test
+    public void 
testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException() throws 
Exception {
+        testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(false);
+    }
+
+    @Test
+    public void 
testBuildingLocalCoveredIndexShouldHandleNoSuchColumnFamilyException() throws 
Exception {
+        testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(true);
+    }
+
+    private void 
testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(boolean 
coveredIndex) throws Exception {
+        String tableName = schemaName + "." + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        String indexTableName = schemaName + "." + indexName;
+        TableName physicalTableName = 
SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
+
+        createBaseTable(tableName, null, null, coveredIndex ? "cf" : null);
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" 
values('b',1,2,4,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" 
values('f',1,2,3,'z')");
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" 
values('j',2,4,2,'a')");
+        conn1.createStatement().execute("UPSERT INTO "+tableName+" 
values('q',3,1,1,'c')");
+        conn1.commit();
+        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), 
TestUtil.TEST_PROPERTIES).getAdmin();
+        HTableDescriptor tableDescriptor = 
admin.getTableDescriptor(physicalTableName);
+        
tableDescriptor.addCoprocessor(DeleyOpenRegionObserver.class.getName(), null,
+            QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY - 1, null);
+        admin.disableTable(physicalTableName);
+        admin.modifyTable(physicalTableName, tableDescriptor);
+        admin.enableTable(physicalTableName);
+        DeleyOpenRegionObserver.DELAY_OPEN = true;
+        conn1.createStatement().execute(
+            "CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(k3)"
+                    + (coveredIndex ? " include(cf.v1)" : ""));
+        DeleyOpenRegionObserver.DELAY_OPEN = false;
+        ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + indexTableName);
+        assertTrue(rs.next());
+        assertEquals(4, rs.getInt(1));
+    }
+
+    public static class DeleyOpenRegionObserver extends BaseRegionObserver {
+        public static volatile boolean DELAY_OPEN = false;
+        private int retryCount = 0;
+        private CountDownLatch latch = new CountDownLatch(1);
+        @Override
+        public void
+                preClose(ObserverContext<RegionCoprocessorEnvironment> c, 
boolean abortRequested)
+                        throws IOException {
+            if(DELAY_OPEN) {
+                try {
+                    latch.await();
+                } catch (InterruptedException e1) {
+                    throw new DoNotRetryIOException(e1);
+                }
+            }
+            super.preClose(c, abortRequested);
+        }
+
+        @Override
+        public RegionScanner 
preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
+                Scan scan, RegionScanner s) throws IOException {
+            if(DELAY_OPEN && retryCount == 1) {
+                latch.countDown();
+            }
+            retryCount++;
+            return super.preScannerOpen(e, scan, s);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81ad6a5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
index 547878c..30baec4 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
@@ -69,6 +69,10 @@ public abstract class BaseLocalIndexIT extends 
BaseUniqueNamesOwnClusterIT {
     }
 
     protected void createBaseTable(String tableName, Integer saltBuckets, 
String splits) throws SQLException {
+        createBaseTable(tableName, saltBuckets, splits, null);
+    }
+
+    protected void createBaseTable(String tableName, Integer saltBuckets, 
String splits, String cf) throws SQLException {
         Connection conn = getConnection();
         if (isNamespaceMapped) {
             conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + 
schemaName);
@@ -77,7 +81,7 @@ public abstract class BaseLocalIndexIT extends 
BaseUniqueNamesOwnClusterIT {
                 "k1 INTEGER NOT NULL,\n" +
                 "k2 INTEGER NOT NULL,\n" +
                 "k3 INTEGER,\n" +
-                "v1 VARCHAR,\n" +
+                (cf != null ? (cf+'.') : "") + "v1 VARCHAR,\n" +
                 "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
                         + (saltBuckets != null && splits == null ? (" 
salt_buckets=" + saltBuckets) : ""
                         + (saltBuckets == null && splits != null ? (" split on 
" + splits) : ""));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81ad6a5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index c0e5c8f..26ddad5 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -75,8 +75,7 @@ public class LocalIndexIT extends BaseLocalIndexIT {
         super(isNamespaceMapped);
     }
     
-    @Ignore
-    //FIXME: PHOENIX-3496 
+    @Test
     public void testLocalIndexRoundTrip() throws Exception {
         String tableName = schemaName + "." + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81ad6a5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 8fbcde6..acf1e17 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -90,6 +92,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
@@ -121,6 +124,7 @@ import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TimeKeeper;
@@ -274,10 +278,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             throws IOException {
         s = super.preScannerOpen(e, scan, s);
         if (ScanUtil.isAnalyzeTable(scan)) {
-//            if (!ScanUtil.isLocalIndex(scan)) {
-//                scan.getFamilyMap().clear();
-//            }
-//            scan.getFamilyMap().clear();
             // We are setting the start row and stop row such that it covers 
the entire region. As part
             // of Phonenix-1263 we are storing the guideposts against the 
physical table rather than
             // individual tenant specific tables.
@@ -447,6 +447,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                         
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ;
 
         boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
+        if(buildLocalIndex) {
+            checkForLocalIndexColumnFamilies(region, indexMaintainers);
+        }
         if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != 
null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
             needToWrite = true;
             maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
@@ -790,6 +793,37 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
 
     }
 
+    private void checkForLocalIndexColumnFamilies(HRegion region,
+            List<IndexMaintainer> indexMaintainers) throws IOException {
+        HTableDescriptor tableDesc = region.getTableDesc();
+        String schemaName =
+                tableDesc.getTableName().getNamespaceAsString()
+                        
.equals(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR) ? SchemaUtil
+                        
.getSchemaNameFromFullName(tableDesc.getTableName().getNameAsString())
+                        : tableDesc.getTableName().getNamespaceAsString();
+        String tableName = 
SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString());
+        for (IndexMaintainer indexMaintainer : indexMaintainers) {
+            Set<ColumnReference> coveredColumns = 
indexMaintainer.getCoveredColumns();
+            if(coveredColumns.isEmpty()) {
+                byte[] localIndexCf = 
indexMaintainer.getEmptyKeyValueFamily().get();
+                // When covered columns empty we store index data in default 
column family so check for it.
+                if (tableDesc.getFamily(localIndexCf) == null) {
+                    ServerUtil.throwIOException("Column Family Not Found",
+                        new ColumnFamilyNotFoundException(schemaName, 
tableName, Bytes
+                                .toString(localIndexCf)));
+                }
+            }
+            for (ColumnReference reference : coveredColumns) {
+                byte[] cf = 
IndexUtil.getLocalIndexColumnFamily(reference.getFamily());
+                HColumnDescriptor family = region.getTableDesc().getFamily(cf);
+                if (family == null) {
+                    ServerUtil.throwIOException("Column Family Not Found",
+                        new ColumnFamilyNotFoundException(schemaName, 
tableName, Bytes.toString(cf)));
+                }
+            }
+        }
+    }
+
     private void commit(HRegion region, List<Mutation> mutations, byte[] 
indexUUID, long blockingMemStoreSize,
             byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, 
boolean useIndexProto,
             boolean isPKChanging)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81ad6a5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 2b8de53..251908f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
@@ -88,6 +89,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
@@ -921,18 +923,18 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                                 
services.clearTableRegionCache(physicalTableName);
                                 
context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
                             }
-                            // Resubmit just this portion of work again
-                            Scan oldScan = scanPair.getFirst();
-                            byte[] startKey = 
oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
-                            byte[] endKey = oldScan.getStopRow();
-                            
-                            List<List<Scan>> newNestedScans = 
this.getParallelScans(startKey, endKey);
-                            // Add any concatIterators that were successful so 
far
-                            // as we need these to be in order
-                            addIterator(iterators, concatIterators);
-                            concatIterators = Lists.newArrayList();
-                            getIterators(newNestedScans, services, 
isLocalIndex, allIterators, iterators, isReverse,
-                                    maxQueryEndTime, newNestedScans.size(), 
previousScan);
+                            concatIterators =
+                                    recreateIterators(services, isLocalIndex, 
allIterators,
+                                        iterators, isReverse, maxQueryEndTime, 
previousScan,
+                                        concatIterators, scanPair);
+                        } catch(ColumnFamilyNotFoundException cfnfe) {
+                            if 
(scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) {
+                                Thread.sleep(1000);
+                                concatIterators =
+                                        recreateIterators(services, 
isLocalIndex, allIterators,
+                                            iterators, isReverse, 
maxQueryEndTime, previousScan,
+                                            clearedCache, concatIterators, 
scanPairItr, scanPair);
+                            }
                         }
                     }
                 }
@@ -984,6 +986,40 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         }
         return null; // Not reachable
     }
+
+    private List<PeekingResultIterator> 
recreateIterators(ConnectionQueryServices services,
+            boolean isLocalIndex, Queue<PeekingResultIterator> allIterators,
+            List<PeekingResultIterator> iterators, boolean isReverse, long 
maxQueryEndTime,
+            ScanWrapper previousScan, List<PeekingResultIterator> 
concatIterators,
+            Pair<Scan, Future<PeekingResultIterator>> scanPair) throws 
SQLException {
+        // Resubmit just this portion of work again
+        Scan oldScan = scanPair.getFirst();
+        byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
+        byte[] endKey = oldScan.getStopRow();
+        
+        List<List<Scan>> newNestedScans = this.getParallelScans(startKey, 
endKey);
+        // Add any concatIterators that were successful so far
+        // as we need these to be in order
+        addIterator(iterators, concatIterators);
+        concatIterators = Lists.newArrayList();
+        getIterators(newNestedScans, services, isLocalIndex, allIterators, 
iterators, isReverse,
+                maxQueryEndTime, newNestedScans.size(), previousScan);
+        return concatIterators;
+    }
+
+    private List<PeekingResultIterator> 
recreateIterators(ConnectionQueryServices services,
+            boolean isLocalIndex, Queue<PeekingResultIterator> allIterators,
+            List<PeekingResultIterator> iterators, boolean isReverse, long 
maxQueryEndTime,
+            ScanWrapper previousScan, boolean clearedCache,
+            List<PeekingResultIterator> concatIterators,
+            Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr,
+            Pair<Scan, Future<PeekingResultIterator>> scanPair) throws 
SQLException {
+        scanPairItr.remove();
+        concatIterators =
+                recreateIterators(services, isLocalIndex, allIterators, 
iterators, isReverse,
+                    maxQueryEndTime, previousScan, concatIterators, scanPair);
+        return concatIterators;
+    }
     
 
     @Override

Reply via email to