PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ddcc5c5e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ddcc5c5e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ddcc5c5e Branch: refs/heads/4.x-HBase-1.0 Commit: ddcc5c5e2a3934fba9351b1bd251f35157ffc079 Parents: 70f7653 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Mon Apr 18 19:44:56 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Mon Apr 18 19:44:56 2016 +0530 ---------------------------------------------------------------------- .../phoenix/end2end/index/LocalIndexIT.java | 18 - .../phoenix/end2end/index/MutableIndexIT.java | 239 ++++++++++- .../DelayedTableResultIteratorFactory.java | 9 +- .../iterate/MockParallelIteratorFactory.java | 3 +- .../regionserver/IndexHalfStoreFileReader.java | 401 +------------------ .../IndexHalfStoreFileReaderGenerator.java | 118 +++--- .../hbase/regionserver/LocalIndexSplitter.java | 37 -- .../LocalIndexStoreFileScanner.java | 254 ++++++++++++ .../phoenix/compile/ListJarsQueryPlan.java | 4 + .../MutatingParallelIteratorFactory.java | 2 +- .../org/apache/phoenix/compile/QueryPlan.java | 2 + .../org/apache/phoenix/compile/ScanRanges.java | 13 +- .../apache/phoenix/compile/TraceQueryPlan.java | 5 + .../coprocessor/BaseScannerRegionObserver.java | 84 +++- .../GroupedAggregateRegionObserver.java | 16 +- .../UngroupedAggregateRegionObserver.java | 8 +- .../apache/phoenix/execute/AggregatePlan.java | 19 +- .../apache/phoenix/execute/BaseQueryPlan.java | 20 +- .../phoenix/execute/ClientAggregatePlan.java | 8 +- .../apache/phoenix/execute/ClientScanPlan.java | 7 +- .../apache/phoenix/execute/CorrelatePlan.java | 8 +- .../phoenix/execute/DegenerateQueryPlan.java | 2 +- .../apache/phoenix/execute/HashJoinPlan.java | 8 +- .../execute/LiteralResultIterationPlan.java | 2 +- .../org/apache/phoenix/execute/ScanPlan.java | 9 +- .../phoenix/execute/SortMergeJoinPlan.java | 7 +- .../phoenix/execute/TupleProjectionPlan.java | 8 +- .../org/apache/phoenix/execute/UnionPlan.java | 6 +- .../apache/phoenix/execute/UnnestArrayPlan.java | 8 +- .../phoenix/iterate/BaseResultIterators.java | 108 +++-- .../phoenix/iterate/ChunkedResultIterator.java | 50 +-- .../DefaultTableResultIteratorFactory.java | 5 +- .../iterate/ParallelIteratorFactory.java | 5 +- .../phoenix/iterate/ParallelIterators.java | 14 +- .../apache/phoenix/iterate/SerialIterators.java | 10 +- .../phoenix/iterate/SpoolingResultIterator.java | 3 +- .../phoenix/iterate/TableResultIterator.java | 69 +++- .../iterate/TableResultIteratorFactory.java | 3 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 5 + .../phoenix/mapreduce/PhoenixRecordReader.java | 5 +- .../java/org/apache/phoenix/util/ScanUtil.java | 64 +++ .../java/org/apache/phoenix/query/BaseTest.java | 2 +- .../query/ParallelIteratorsSplitTest.java | 9 +- 43 files changed, 1039 insertions(+), 638 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index 2d79f36..ef7e686 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -1013,24 +1013,6 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT { assertEquals(5, regionsOfIndexTable.size()); boolean success = latch1.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS); assertTrue("Timed out waiting for MockedLocalIndexSplitter.preSplitAfterPONR to complete", success); - // Verify the metadata for index is correct. - rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(indexName, rs.getString(3)); - assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName+"_2", - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(indexName+"_2", rs.getString(3)); - assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - - String query = "SELECT t_id,k1,v1 FROM " + tableName+"2"; - rs = conn1.createStatement().executeQuery("EXPLAIN " + query); - assertEquals("CLIENT PARALLEL " + 1 + "-WAY FULL SCAN OVER " + tableName+"2", - QueryUtil.getExplainPlan(rs)); latch2.countDown(); } finally { conn1.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index add282e..80f1250 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -31,14 +32,25 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Properties; +import jline.internal.Log; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -86,8 +98,8 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT { @Parameters(name="localIndex = {0} , transactional = {1}") public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } + return Arrays.asList(new Boolean[][] { + { false, false }, { false, true }, { true, false }, { true, true } }); } @@ -594,4 +606,227 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT { } } + @Test + public void testSplitDuringIndexScan() throws Exception { + testSplitDuringIndexScan(false); + } + + @Test + public void testSplitDuringIndexReverseScan() throws Exception { + testSplitDuringIndexScan(true); + } + + private void testSplitDuringIndexScan(boolean isReverse) throws Exception { + Properties props = new Properties(); + props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(2)); + props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); + try(Connection conn1 = DriverManager.getConnection(getUrl(), props)){ + String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"}; + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + dropTable(admin, conn1); + createTableAndLoadData(conn1, strings, isReverse); + + ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + tableName); + assertTrue(rs.next()); + splitDuringScan(conn1, strings, admin, isReverse); + dropTable(admin, conn1); + } + } + + private void dropTable(HBaseAdmin admin, Connection conn) throws SQLException, IOException { + conn.createStatement().execute("DROP TABLE IF EXISTS "+ tableName); + if(admin.tableExists(tableName)) { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); + } + if(admin.tableExists(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName)) { + admin.disableTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName); + admin.deleteTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName); + } + } + + private void createTableAndLoadData(Connection conn1, String[] strings, boolean isReverse) throws SQLException { + createBaseTable(conn1, tableName, null); + for (int i = 0; i < 26; i++) { + conn1.createStatement().execute( + "UPSERT INTO " + tableName + " values('"+strings[i]+"'," + i + "," + + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')"); + } + conn1.commit(); + conn1.createStatement().execute( + "CREATE " + (localIndex ? "LOCAL" : "")+" INDEX " + indexName + " ON " + tableName + "(v1"+(isReverse?" DESC":"")+") include (k3)"); + } + + @Test + public void testIndexHalfStoreFileReader() throws Exception { + Connection conn1 = DriverManager.getConnection(getUrl()); + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + try { + dropTable(admin, conn1); + createBaseTable(conn1, tableName, "('e')"); + conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+" INDEX " + indexName + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON ('e')")); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')"); + conn1.commit(); + + String query = "SELECT count(*) FROM " + tableName +" where v1<='z'"; + ResultSet rs = conn1.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + + TableName table = TableName.valueOf(localIndex?tableName: indexName); + TableName indexTable = TableName.valueOf(localIndex?MetaDataUtil.getLocalIndexTableName(tableName): indexName); + admin.flush(indexTable); + boolean merged = false; + // merge regions until 1 left + end: while (true) { + long numRegions = 0; + while (true) { + rs = conn1.createStatement().executeQuery(query); + assertTrue(rs.next()); + System.out.println("Number of rows returned:" + rs.getInt(1)); + assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results? + try { + List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable); + numRegions = indexRegions.size(); + if (numRegions==1) { + break end; + } + if(!merged) { + List<HRegionInfo> regions = + admin.getTableRegions(localIndex ? table : indexTable); + System.out.println("Merging: " + regions.size()); + admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(), + regions.get(1).getEncodedNameAsBytes(), false); + merged = true; + Threads.sleep(10000); + } + break; + } catch (Exception ex) { + Log.info(ex); + } + + long waitStartTime = System.currentTimeMillis(); + // wait until merge happened + while (System.currentTimeMillis() - waitStartTime < 10000) { + List<HRegionInfo> regions = admin.getTableRegions(indexTable); + System.out.println("Waiting:" + regions.size()); + if (regions.size() < numRegions) { + break; + } + Threads.sleep(1000); + } + } + } + } finally { + dropTable(admin, conn1); + } + } + + private List<HRegionInfo> mergeRegions(HBaseAdmin admin, List<HRegionInfo> regionsOfUserTable) + throws IOException, InterruptedException { + for (int i = 2; i > 0; i--) { + Threads.sleep(10000); + admin.mergeRegions(regionsOfUserTable.get(0).getEncodedNameAsBytes(), + regionsOfUserTable.get(1).getEncodedNameAsBytes(), false); + regionsOfUserTable = + MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), admin.getConnection(), + TableName.valueOf(localIndex? tableName:indexName), false); + + while (regionsOfUserTable.size() != i) { + Thread.sleep(100); + regionsOfUserTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), + admin.getConnection(), TableName.valueOf(localIndex? tableName:indexName), false); + } + assertEquals(i, regionsOfUserTable.size()); + if(localIndex) { + List<HRegionInfo> regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), + admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false); + while (regionsOfIndexTable.size() != i) { + Thread.sleep(100); + regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), + admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false); + } + assertEquals(i, regionsOfIndexTable.size()); + } + } + return regionsOfUserTable; + } + + private List<HRegionInfo> splitDuringScan(Connection conn1, String[] strings, HBaseAdmin admin, boolean isReverse) + throws SQLException, IOException, InterruptedException { + ResultSet rs; + String query = "SELECT t_id,k1,v1 FROM " + tableName; + rs = conn1.createStatement().executeQuery(query); + String[] tIdColumnValues = new String[26]; + String[] v1ColumnValues = new String[26]; + int[] k1ColumnValue = new int[26]; + for (int j = 0; j < 5; j++) { + assertTrue(rs.next()); + tIdColumnValues[j] = rs.getString("t_id"); + k1ColumnValue[j] = rs.getInt("k1"); + v1ColumnValues[j] = rs.getString("V1"); + } + + String[] splitKeys = new String[2]; + splitKeys[0] = strings[4]; + splitKeys[1] = strings[12]; + + int[] splitInts = new int[2]; + splitInts[0] = 22; + splitInts[1] = 4; + List<HRegionInfo> regionsOfUserTable = null; + for(int i = 0; i <=1; i++) { + Threads.sleep(10000); + if(localIndex) { + admin.split(Bytes.toBytes(tableName), + ByteUtil.concat(Bytes.toBytes(splitKeys[i]))); + } else { + admin.split(Bytes.toBytes(indexName), ByteUtil.concat(Bytes.toBytes(splitInts[i]))); + } + Thread.sleep(100); + regionsOfUserTable = + MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), + admin.getConnection(), TableName.valueOf(localIndex?tableName:indexName), + false); + + while (regionsOfUserTable.size() != (i+2)) { + Thread.sleep(100); + regionsOfUserTable = + MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), + admin.getConnection(), + TableName.valueOf(localIndex?tableName:indexName), false); + } + assertEquals(i+2, regionsOfUserTable.size()); + } + for (int j = 5; j < 26; j++) { + assertTrue(rs.next()); + tIdColumnValues[j] = rs.getString("t_id"); + k1ColumnValue[j] = rs.getInt("k1"); + v1ColumnValues[j] = rs.getString("V1"); + } + Arrays.sort(tIdColumnValues); + Arrays.sort(v1ColumnValues); + Arrays.sort(k1ColumnValue); + assertTrue(Arrays.equals(strings, tIdColumnValues)); + assertTrue(Arrays.equals(strings, v1ColumnValues)); + for(int i=0;i<26;i++) { + assertEquals(i, k1ColumnValue[i]); + } + assertFalse(rs.next()); + return regionsOfUserTable; + } + + private void createBaseTable(Connection conn, String tableName, String splits) throws SQLException { + String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + + "k3 INTEGER,\n" + + "v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n" + + (tableDDLOptions!=null?tableDDLOptions:"") + (splits != null ? (" split on " + splits) : ""); + conn.createStatement().execute(ddl); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java index 74deb71..55bed91 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; @@ -39,13 +40,13 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac @Override public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, - CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException { - return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold); + CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); } private class DelayedTableResultIterator extends TableResultIterator { - public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException { - super(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold); + public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java index d8a08e6..b5c5f0f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.schema.PTable; @@ -33,7 +34,7 @@ public class MockParallelIteratorFactory implements ParallelIteratorFactory { @Override public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, - String physicalTableName) throws SQLException { + String physicalTableName, QueryPlan plan) throws SQLException { return new MockResultIterator(String.valueOf(counter.incrementAndGet()), table); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java index 18bd032..d03f772 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java @@ -18,27 +18,18 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Map; -import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.index.IndexMaintainer; /** @@ -47,24 +38,23 @@ import org.apache.phoenix.index.IndexMaintainer; * that sort lowest and 'top' is the second half of the file with keys that sort greater than those * of the bottom half. The top includes the split files midkey, of the key that follows if it does * not exist in the file. - * + * * <p> * This type works in tandem with the {@link Reference} type. This class is used reading while * Reference is used writing. - * + * * <p> * This file is not splitable. Calls to {@link #midkey()} return null. */ public class IndexHalfStoreFileReader extends StoreFile.Reader { - private static final int ROW_KEY_LENGTH = 2; private final boolean top; // This is the key we split around. Its the first possible entry on a row: // i.e. empty column and a timestamp of LATEST_TIMESTAMP. private final byte[] splitkey; private final byte[] splitRow; private final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers; - private final byte[][] viewConstants; + private final byte[][] viewConstants; private final int offset; private final HRegionInfo regionInfo; private final byte[] regionStartKeyInHFile; @@ -73,36 +63,6 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { * @param fs * @param p * @param cacheConf - * @param r - * @param conf - * @param indexMaintainers - * @param viewConstants - * @param regionInfo - * @param regionStartKeyInHFile - * @param splitKey - * @throws IOException - */ - public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf, - final Reference r, final Configuration conf, - final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers, - final byte[][] viewConstants, final HRegionInfo regionInfo, - final byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException { - super(fs, p, cacheConf, conf); - this.splitkey = splitKey == null ? r.getSplitKey() : splitKey; - // Is it top or bottom half? - this.top = Reference.isTopFileRegion(r.getFileRegion()); - this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey)); - this.indexMaintainers = indexMaintainers; - this.viewConstants = viewConstants; - this.regionInfo = regionInfo; - this.regionStartKeyInHFile = regionStartKeyInHFile; - this.offset = regionStartKeyInHFile.length; - } - - /** - * @param fs - * @param p - * @param cacheConf * @param in * @param size * @param r @@ -132,356 +92,35 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { this.offset = regionStartKeyInHFile.length; } - protected boolean isTop() { - return this.top; + public int getOffset() { + return offset; } - @Override - public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread, - final boolean isCompaction) { - final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction); - return new HFileScanner() { - final HFileScanner delegate = s; - public boolean atEnd = false; - - public ByteBuffer getKey() { - if (atEnd) { - return null; - } - boolean changeBottomKeys = - regionInfo.getStartKey().length == 0 && splitRow.length != offset; - if (!top) { - // For first region we are prepending empty byte array of length region end key. - // So if split row length is not equal to region end key length then we need to - // replace empty bytes of split row length. Because after split end key is the split - // row. - if(!changeBottomKeys) return delegate.getKey(); - } - // If it is top store file replace the StartKey of the Key with SplitKey - return getChangedKey(delegate.getKeyValue(), changeBottomKeys); - } - - private ByteBuffer getChangedKey(Cell kv, boolean changeBottomKeys) { - // new KeyValue(row, family, qualifier, timestamp, type, value) - byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys); - KeyValue newKv = - new KeyValue(newRowkey, 0, newRowkey.length, kv.getFamilyArray(), - kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), - kv.getQualifierOffset(), kv.getQualifierLength(), - kv.getTimestamp(), Type.codeToType(kv.getTypeByte()), null, 0, 0); - ByteBuffer keyBuffer = ByteBuffer.wrap(newKv.getKey()); - return keyBuffer; - } - - private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) { - int lenOfRemainingKey = kv.getRowLength() - offset; - byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + splitRow.length]; - System.arraycopy(changeBottomKeys ? new byte[splitRow.length] : splitRow, 0, - keyReplacedStartKey, 0, splitRow.length); - System.arraycopy(kv.getRowArray(), kv.getRowOffset() + offset, keyReplacedStartKey, - splitRow.length, lenOfRemainingKey); - return keyReplacedStartKey; - } - - public String getKeyString() { - if (atEnd) { - return null; - } - return Bytes.toStringBinary(getKey()); - } - - public ByteBuffer getValue() { - if (atEnd) { - return null; - } - return delegate.getValue(); - } - - public String getValueString() { - if (atEnd) { - return null; - } - return Bytes.toStringBinary(getValue()); - } - - public Cell getKeyValue() { - if (atEnd) { - return null; - } - Cell kv = delegate.getKeyValue(); - boolean changeBottomKeys = - regionInfo.getStartKey().length == 0 && splitRow.length != offset; - if (!top) { - if(!changeBottomKeys) return kv; - } - // If it is a top store file change the StartKey with SplitKey in Key - // and produce the new value corresponding to the change in key - byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys); - KeyValue changedKv = - new KeyValue(changedKey, 0, changedKey.length, kv.getFamilyArray(), - kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), - kv.getQualifierOffset(), kv.getQualifierLength(), - kv.getTimestamp(), Type.codeToType(kv.getTypeByte()), - kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(), - kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength()); - return changedKv; - } - - public boolean next() throws IOException { - if (atEnd) { - return false; - } - while (true) { - boolean b = delegate.next(); - if (!b) { - atEnd = true; - return b; - } - // We need to check whether the current KV pointed by this reader is - // corresponding to - // this split or not. - // In case of top store file if the ActualRowKey >= SplitKey - // In case of bottom store file if the ActualRowKey < Splitkey - if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) { - return true; - } - } - } - - public boolean seekBefore(byte[] key) throws IOException { - return seekBefore(key, 0, key.length); - } - - public boolean seekBefore(byte[] key, int offset, int length) throws IOException { - - if (top) { - byte[] fk = getFirstKey(); - // This will be null when the file is empty in which we can not seekBefore to - // any key - if (fk == null) { - return false; - } - if (getComparator().compare(key, offset, length, fk, 0, fk.length) <= 0) { - return false; - } - KeyValue replacedKey = getKeyPresentInHFiles(key); - return this.delegate.seekBefore(replacedKey); - } else { - // The equals sign isn't strictly necessary just here to be consistent with - // seekTo - if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) { - return this.delegate.seekBefore(splitkey, 0, splitkey.length); - } - } - return this.delegate.seekBefore(key, offset, length); - } - - @Override - public boolean seekBefore(Cell cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - return seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()); - } - - public boolean seekTo() throws IOException { - boolean b = delegate.seekTo(); - if (!b) { - atEnd = true; - return b; - } - while (true) { - // We need to check the first occurrence of satisfying the condition - // In case of top store file if the ActualRowKey >= SplitKey - // In case of bottom store file if the ActualRowKey < Splitkey - if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) { - return true; - } - b = delegate.next(); - if (!b) { - return b; - } - } - } - - public int seekTo(byte[] key) throws IOException { - return seekTo(key, 0, key.length); - } - - public int seekTo(byte[] key, int offset, int length) throws IOException { - if (top) { - if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) { - return -1; - } - KeyValue replacedKey = getKeyPresentInHFiles(key); - - int seekTo = - delegate.seekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(), - replacedKey.getKeyLength()); - return seekTo; - /* - * if (seekTo == 0 || seekTo == -1) { return seekTo; } else if (seekTo == 1) { - * boolean next = this.next(); } - */ - } else { - if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) { - // we would place the scanner in the second half. - // it might be an error to return false here ever... - boolean res = delegate.seekBefore(splitkey, 0, splitkey.length); - if (!res) { - throw new IOException( - "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)"); - } - return 1; - } - } - return delegate.seekTo(key, offset, length); - } - - @Override - public int seekTo(Cell cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - return seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()); - } - - public int reseekTo(byte[] key) throws IOException { - return reseekTo(key, 0, key.length); - } - - public int reseekTo(byte[] key, int offset, int length) throws IOException { - if (top) { - if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) { - return -1; - } - KeyValue replacedKey = getKeyPresentInHFiles(key); - return delegate.reseekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(), - replacedKey.getKeyLength()); - } else { - if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) { - // we would place the scanner in the second half. - // it might be an error to return false here ever... - boolean res = delegate.seekBefore(splitkey, 0, splitkey.length); - if (!res) { - throw new IOException( - "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)"); - } - return 1; - } - } - return delegate.reseekTo(key, offset, length); - } - - @Override - public int reseekTo(Cell cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - return reseekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()); - } - - public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() { - return this.delegate.getReader(); - } - - // TODO: Need to change as per IndexHalfStoreFileReader - public boolean isSeeked() { - return this.delegate.isSeeked(); - } - - // Added for compatibility with HBASE-13109 - // Once we drop support for older versions, add an @override annotation here - // and figure out how to get the next indexed key - public Cell getNextIndexedKey() { - return null; // indicate that we cannot use the optimization - } - }; + public byte[][] getViewConstants() { + return viewConstants; } - private boolean isSatisfiedMidKeyCondition(Cell kv) { - if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) { - // In case of a Delete type KV, let it be going to both the daughter regions. - // No problems in doing so. In the correct daughter region where it belongs to, this delete - // tomb will really delete a KV. In the other it will just hang around there with no actual - // kv coming for which this is a delete tomb. :) - return true; - } - ImmutableBytesWritable rowKey = - new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + offset, - kv.getRowLength() - offset); - Entry<ImmutableBytesWritable, IndexMaintainer> entry = indexMaintainers.entrySet().iterator().next(); - IndexMaintainer indexMaintainer = entry.getValue(); - byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey); - IndexMaintainer actualIndexMaintainer = indexMaintainers.get(new ImmutableBytesWritable(viewIndexId)); - byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, this.viewConstants); - int compareResult = Bytes.compareTo(dataRowKey, splitRow); - if (top) { - if (compareResult >= 0) { - return true; - } - } else { - if (compareResult < 0) { - return true; - } - } - return false; + public Map<ImmutableBytesWritable, IndexMaintainer> getIndexMaintainers() { + return indexMaintainers; } - /** - * In case of top half store, the passed key will be with the start key of the daughter region. - * But in the actual HFiles, the key will be with the start key of the old parent region. In - * order to make the real seek in the HFiles, we need to build the old key. - * - * The logic here is just replace daughter region start key with parent region start key - * in the key part. - * - * @param key - * - */ - private KeyValue getKeyPresentInHFiles(byte[] key) { - KeyValue keyValue = new KeyValue(key); - int rowLength = keyValue.getRowLength(); - int rowOffset = keyValue.getRowOffset(); - - int daughterStartKeyLength = - regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo - .getStartKey().length; - - // This comes incase of deletefamily - if (top - && 0 == keyValue.getValueLength() - && keyValue.getTimestamp() == HConstants.LATEST_TIMESTAMP - && Bytes.compareTo(keyValue.getRowArray(), keyValue.getRowOffset(), - keyValue.getRowLength(), splitRow, 0, splitRow.length) == 0 - && CellUtil.isDeleteFamily(keyValue)) { - KeyValue createFirstDeleteFamilyOnRow = - KeyValueUtil.createFirstDeleteFamilyOnRow(regionStartKeyInHFile, - keyValue.getFamily()); - return createFirstDeleteFamilyOnRow; - } + public HRegionInfo getRegionInfo() { + return regionInfo; + } - short length = (short) (keyValue.getRowLength() - daughterStartKeyLength + offset); - byte[] replacedKey = - new byte[length + key.length - (rowOffset + rowLength) + ROW_KEY_LENGTH]; - System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_KEY_LENGTH); - System.arraycopy(regionStartKeyInHFile, 0, replacedKey, ROW_KEY_LENGTH, offset); - System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + daughterStartKeyLength, - replacedKey, offset + ROW_KEY_LENGTH, keyValue.getRowLength() - - daughterStartKeyLength); - System.arraycopy(key, rowOffset + rowLength, replacedKey, - offset + keyValue.getRowLength() - daughterStartKeyLength - + ROW_KEY_LENGTH, key.length - (rowOffset + rowLength)); - return KeyValue.createKeyValueFromKey(replacedKey); + public byte[] getRegionStartKeyInHFile() { + return regionStartKeyInHFile; } - @Override - public byte[] midkey() throws IOException { - // Returns null to indicate file is not splitable. - return null; + public byte[] getSplitkey() { + return splitkey; } - @Override - public byte[] getFirstKey() { - return super.getFirstKey(); + public byte[] getSplitRow() { + return splitRow; } - @Override - public boolean passesKeyRangeFilter(Scan scan) { - return true; + public boolean isTop() { + return top; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java index 1284dcf..7bd474c 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -51,11 +53,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.parse.AlterIndexStatement; import org.apache.phoenix.parse.ParseNodeFactory; -import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; @@ -116,7 +115,17 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { // We need not change any thing in first region data because first region start key // is equal to merged region start key. So returning same reader. if (Bytes.compareTo(mergeRegions.getFirst().getStartKey(), splitRow) == 0) { - return reader; + if (mergeRegions.getFirst().getStartKey().length == 0 + && region.getRegionInfo().getEndKey().length != mergeRegions + .getFirst().getEndKey().length) { + childRegion = mergeRegions.getFirst(); + regionStartKeyInHFile = + mergeRegions.getFirst().getStartKey().length == 0 ? new byte[mergeRegions + .getFirst().getEndKey().length] : mergeRegions.getFirst() + .getStartKey(); + } else { + return reader; + } } else { childRegion = mergeRegions.getSecond(); regionStartKeyInHFile = mergeRegions.getSecond().getStartKey(); @@ -169,58 +178,31 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { } return reader; } - + + @SuppressWarnings("deprecation") @Override public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException { - InternalScanner internalScanner = super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request); - Collection<StoreFile> files = request.getFiles(); - storeFilesCount = 0; - compactedFilesCount = 0; - for(StoreFile file:files) { - if(!file.isReference()) { - return internalScanner; - } + if (!scanType.equals(ScanType.COMPACT_DROP_DELETES) || s != null || !store.hasReferences()) { + return s; } - storeFilesCount = files.size(); - return internalScanner; - } - - @Override - public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, - StoreFile resultFile) throws IOException { - super.postCompact(e, store, resultFile); - if(storeFilesCount > 0) compactedFilesCount++; - if(compactedFilesCount == storeFilesCount) { - PhoenixConnection conn = null; - try { - conn = QueryUtil.getConnection(e.getEnvironment().getConfiguration()).unwrap( - PhoenixConnection.class); - MetaDataClient client = new MetaDataClient(conn); - String userTableName = MetaDataUtil.getUserTableName(e.getEnvironment().getRegion().getTableDesc().getNameAsString()); - PTable dataTable = PhoenixRuntime.getTable(conn, userTableName); - List<PTable> indexes = dataTable.getIndexes(); - for (PTable index : indexes) { - if (index.getIndexType() == IndexType.LOCAL) { - AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, - org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), - dataTable.getTableName().getString(), false, PIndexState.ACTIVE); - client.alterIndex(indexStatement); - } - } - conn.commit(); - } catch (ClassNotFoundException ex) { - } catch (SQLException ex) { - } finally { - if (conn != null) { - try { - conn.close(); - } catch (SQLException ex) { - } - } + List<StoreFileScanner> newScanners = new ArrayList<StoreFileScanner>(scanners.size()); + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + for(KeyValueScanner scanner: scanners) { + Reader reader = ((StoreFileScanner) scanner).getReader(); + if (reader instanceof IndexHalfStoreFileReader) { + newScanners.add(new LocalIndexStoreFileScanner(reader, reader.getScanner( + scan.getCacheBlocks(), scanUsePread, false), true, reader.getHFileReader() + .hasMVCCInfo(), store.getSmallestReadPoint())); + } else { + newScanners.add(((StoreFileScanner) scanner)); } } + return new StoreScanner(store, store.getScanInfo(), scan, newScanners, + scanType, store.getSmallestReadPoint(), earliestPutTs); } private byte[][] getViewConstants(PTable dataTable) { @@ -253,4 +235,42 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { } return viewConstants; } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final Scan scan, final NavigableSet<byte[]> targetCols, + final KeyValueScanner s) throws IOException { + if(store.hasReferences()) { + long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()); + boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + Collection<StoreFile> storeFiles = store.getStorefiles(); + List<StoreFile> nonReferenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size()); + List<StoreFile> referenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size()); + List<KeyValueScanner> keyValueScanners = new ArrayList<KeyValueScanner>(store.getStorefiles().size()+1); + for(StoreFile storeFile : storeFiles) { + if (storeFile.isReference()) { + referenceStoreFiles.add(storeFile); + } else { + nonReferenceStoreFiles.add(storeFile); + } + } + List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt); + keyValueScanners.addAll(scanners); + for(StoreFile sf : referenceStoreFiles) { + if(sf.getReader() instanceof IndexHalfStoreFileReader) { + keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader() + .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf + .getReader().getHFileReader().hasMVCCInfo(), readPt)); + } else { + keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader() + .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf + .getReader().getHFileReader().hasMVCCInfo(), readPt)); + } + } + keyValueScanners.addAll(((HStore)store).memstore.getScanners(readPt)); + if(!scan.isReversed()) return new StoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners); + return new ReversedStoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners); + } + return s; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java index 2547eca..a418d0e 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java @@ -31,23 +31,14 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.phoenix.hbase.index.util.VersionUtil; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.parse.AlterIndexStatement; import org.apache.phoenix.parse.ParseNodeFactory; -import org.apache.phoenix.schema.MetaDataClient; -import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.sql.SQLException; import java.util.List; public class LocalIndexSplitter extends BaseRegionObserver { @@ -143,34 +134,6 @@ public class LocalIndexSplitter extends BaseRegionObserver { throws IOException { if (st == null || daughterRegions == null) return; RegionCoprocessorEnvironment environment = ctx.getEnvironment(); - PhoenixConnection conn = null; - try { - conn = QueryUtil.getConnection(ctx.getEnvironment().getConfiguration()).unwrap( - PhoenixConnection.class); - MetaDataClient client = new MetaDataClient(conn); - String userTableName = ctx.getEnvironment().getRegion().getTableDesc().getNameAsString(); - PTable dataTable = PhoenixRuntime.getTable(conn, userTableName); - List<PTable> indexes = dataTable.getIndexes(); - for (PTable index : indexes) { - if (index.getIndexType() == IndexType.LOCAL) { - AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, - org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), - dataTable.getTableName().getString(), false, PIndexState.INACTIVE); - client.alterIndex(indexStatement); - } - } - conn.commit(); - } catch (ClassNotFoundException ex) { - } catch (SQLException ex) { - } finally { - if (conn != null) { - try { - conn.close(); - } catch (SQLException ex) { - } - } - } - HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); st.stepsAfterPONR(rs, rs, daughterRegions); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java new file mode 100644 index 0000000..a6e5005 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.index.IndexMaintainer; + +import static org.apache.hadoop.hbase.KeyValue.ROW_LENGTH_SIZE; + +public class LocalIndexStoreFileScanner extends StoreFileScanner{ + + private IndexHalfStoreFileReader reader; + private boolean changeBottomKeys; + public LocalIndexStoreFileScanner(Reader reader, HFileScanner hfs, boolean useMVCC, + boolean hasMVCC, long readPt) { + super(reader, hfs, useMVCC, hasMVCC, readPt); + this.reader = ((IndexHalfStoreFileReader)super.getReader()); + this.changeBottomKeys = + this.reader.getRegionInfo().getStartKey().length == 0 + && this.reader.getSplitRow().length != this.reader.getOffset(); + } + + @Override + public Cell next() throws IOException { + Cell next = super.next(); + while(next !=null && !isSatisfiedMidKeyCondition(next)) { + next = super.next(); + } + while(super.peek() != null && !isSatisfiedMidKeyCondition(super.peek())) { + super.next(); + } + if (next!=null && (reader.isTop() || changeBottomKeys)) { + next = getChangedKey(next, !reader.isTop() && changeBottomKeys); + } + return next; + } + + @Override + public Cell peek() { + Cell peek = super.peek(); + if (peek != null && (reader.isTop() || changeBottomKeys)) { + peek = getChangedKey(peek, !reader.isTop() && changeBottomKeys); + } + return peek; + } + + private KeyValue getChangedKey(Cell next, boolean changeBottomKeys) { + // If it is a top store file change the StartKey with SplitKey in Key + //and produce the new value corresponding to the change in key + byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(next, changeBottomKeys); + KeyValue changedKv = + new KeyValue(changedKey, 0, changedKey.length, next.getFamilyArray(), + next.getFamilyOffset(), next.getFamilyLength(), next.getQualifierArray(), + next.getQualifierOffset(), next.getQualifierLength(), + next.getTimestamp(), Type.codeToType(next.getTypeByte()), + next.getValueArray(), next.getValueOffset(), next.getValueLength(), + next.getTagsArray(), next.getTagsOffset(), next.getTagsLength()); + return changedKv; + } + + @Override + public boolean seek(Cell key) throws IOException { + return seekOrReseek(key, true); + } + + @Override + public boolean reseek(Cell key) throws IOException { + return seekOrReseek(key, false); + } + + @Override + public boolean seekToPreviousRow(Cell key) throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(key); + if (reader.isTop()) { + byte[] fk = reader.getFirstKey(); + // This will be null when the file is empty in which we can not seekBefore to + // any key + if (fk == null) { + return false; + } + if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), fk, 0, fk.length) <= 0) { + return super.seekToPreviousRow(key); + } + KeyValue replacedKey = getKeyPresentInHFiles(kv.getBuffer()); + boolean seekToPreviousRow = super.seekToPreviousRow(replacedKey); + while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) { + seekToPreviousRow = super.seekToPreviousRow(super.peek()); + } + return seekToPreviousRow; + } else { + // The equals sign isn't strictly necessary just here to be consistent with + // seekTo + if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) { + boolean seekToPreviousRow = super.seekToPreviousRow(kv); + while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) { + seekToPreviousRow = super.seekToPreviousRow(super.peek()); + } + return seekToPreviousRow; + } + } + boolean seekToPreviousRow = super.seekToPreviousRow(kv); + while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) { + seekToPreviousRow = super.seekToPreviousRow(super.peek()); + } + return seekToPreviousRow; + } + + @Override + public boolean seekToLastRow() throws IOException { + boolean seekToLastRow = super.seekToLastRow(); + while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) { + seekToLastRow = super.seekToPreviousRow(super.peek()); + } + return seekToLastRow; + } + + private boolean isSatisfiedMidKeyCondition(Cell kv) { + if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) { + // In case of a Delete type KV, let it be going to both the daughter regions. + // No problems in doing so. In the correct daughter region where it belongs to, this delete + // tomb will really delete a KV. In the other it will just hang around there with no actual + // kv coming for which this is a delete tomb. :) + return true; + } + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(), + kv.getRowLength() - reader.getOffset()); + Entry<ImmutableBytesWritable, IndexMaintainer> entry = reader.getIndexMaintainers().entrySet().iterator().next(); + IndexMaintainer indexMaintainer = entry.getValue(); + byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey); + IndexMaintainer actualIndexMaintainer = reader.getIndexMaintainers().get(new ImmutableBytesWritable(viewIndexId)); + byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, reader.getViewConstants()); + int compareResult = Bytes.compareTo(dataRowKey, reader.getSplitRow()); + if (reader.isTop()) { + if (compareResult >= 0) { + return true; + } + } else { + if (compareResult < 0) { + return true; + } + } + return false; + } + + /** + * In case of top half store, the passed key will be with the start key of the daughter region. + * But in the actual HFiles, the key will be with the start key of the old parent region. In + * order to make the real seek in the HFiles, we need to build the old key. + * + * The logic here is just replace daughter region start key with parent region start key + * in the key part. + * + * @param key + * + */ + private KeyValue getKeyPresentInHFiles(byte[] key) { + KeyValue keyValue = new KeyValue(key); + int rowLength = keyValue.getRowLength(); + int rowOffset = keyValue.getRowOffset(); + + short length = (short) (rowLength - reader.getSplitRow().length + reader.getOffset()); + byte[] replacedKey = + new byte[length + key.length - (rowOffset + rowLength) + ROW_LENGTH_SIZE]; + System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_LENGTH_SIZE); + System.arraycopy(reader.getRegionStartKeyInHFile(), 0, replacedKey, ROW_LENGTH_SIZE, reader.getOffset()); + System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + reader.getSplitRow().length, + replacedKey, reader.getOffset() + ROW_LENGTH_SIZE, rowLength + - reader.getSplitRow().length); + System.arraycopy(key, rowOffset + rowLength, replacedKey, + reader.getOffset() + keyValue.getRowLength() - reader.getSplitRow().length + + ROW_LENGTH_SIZE, key.length - (rowOffset + rowLength)); + return new KeyValue.KeyOnlyKeyValue(replacedKey); + } + + /** + * + * @param kv + * @param isSeek pass true for seek, false for reseek. + * @return + * @throws IOException + */ + public boolean seekOrReseek(Cell cell, boolean isSeek) throws IOException{ + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue keyToSeek = kv; + if (reader.isTop()) { + if(getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) < 0){ + if(!isSeek && realSeekDone()) { + return true; + } + return seekOrReseekToProperKey(isSeek, keyToSeek); + } + keyToSeek = getKeyPresentInHFiles(kv.getBuffer()); + return seekOrReseekToProperKey(isSeek, keyToSeek); + } else { + if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) { + close(); + return false; + } + if(!isSeek && reader.getRegionInfo().getStartKey().length == 0 && reader.getSplitRow().length > reader.getRegionStartKeyInHFile().length) { + keyToSeek = getKeyPresentInHFiles(kv.getBuffer()); + } + } + return seekOrReseekToProperKey(isSeek, keyToSeek); + } + + private boolean seekOrReseekToProperKey(boolean isSeek, KeyValue kv) + throws IOException { + boolean seekOrReseek = isSeek ? super.seek(kv) : super.reseek(kv); + while (seekOrReseek && super.peek() != null + && !isSatisfiedMidKeyCondition(super.peek())) { + super.next(); + seekOrReseek = super.peek() != null; + } + return seekOrReseek; + } + + private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) { + int lenOfRemainingKey = kv.getRowLength() - reader.getOffset(); + byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + reader.getSplitRow().length]; + System.arraycopy(changeBottomKeys ? new byte[reader.getSplitRow().length] : reader.getSplitRow(), 0, + keyReplacedStartKey, 0, reader.getSplitRow().length); + System.arraycopy(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(), keyReplacedStartKey, + reader.getSplitRow().length, lenOfRemainingKey); + return keyReplacedStartKey; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 94736ed..b52e704 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -117,6 +117,10 @@ public class ListJarsQueryPlan implements QueryPlan { } @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan s) throws SQLException { + return iterator(scanGrouper); + } + @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { return new ResultIterator() { private RemoteIterator<LocatedFileStatus> listFiles = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index ed421a7..8e63fa9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -53,7 +53,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException; @Override - public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException { + public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan plan) throws SQLException { final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection); MutationState state = mutate(parentContext, iterator, clonedConnection); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index 4dcc134..7722483 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -49,6 +49,8 @@ public interface QueryPlan extends StatementPlan { public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException; + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException; + public long getEstimatedSize(); // TODO: change once joins are supported http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index 719970a..3209391 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -18,7 +18,8 @@ package org.apache.phoenix.compile; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import java.io.IOException; import java.util.ArrayList; @@ -204,7 +205,7 @@ public class ScanRanges { scan.setStopRow(scanRange.getUpperRange()); } - private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) { + public static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) { if (key.length > 0) { byte[] newKey = new byte[key.length + prefixKeyOffset]; int totalKeyOffset = keyOffset + prefixKeyOffset; @@ -213,7 +214,7 @@ public class ScanRanges { } System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset); return newKey; - } + } return key; } @@ -229,7 +230,7 @@ public class ScanRanges { return temp; } - private static byte[] stripPrefix(byte[] key, int keyOffset) { + public static byte[] stripPrefix(byte[] key, int keyOffset) { if (key.length == 0) { return key; } @@ -388,10 +389,6 @@ public class ScanRanges { newScan.setAttribute(SCAN_ACTUAL_START_ROW, scanStartKey); newScan.setStartRow(scanStartKey); newScan.setStopRow(scanStopKey); - if(keyOffset > 0) { - newScan.setAttribute(STARTKEY_OFFSET, Bytes.toBytes(keyOffset)); - } - return newScan; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 54b4eb7..5e0977b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -115,6 +115,11 @@ public class TraceQueryPlan implements QueryPlan { } @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + return iterator(scanGrouper); + } + + @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { final PhoenixConnection conn = stmt.getConnection(); if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/ddcc5c5e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 87929e4..076e8c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Span; import org.apache.htrace.Trace; +import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -84,7 +86,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema"; public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin"; public static final String VIEW_CONSTANTS = "_ViewConstants"; - public static final String STARTKEY_OFFSET = "_StartKeyOffset"; public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; public static final String ANALYZE_TABLE = "_ANALYZETABLE"; @@ -99,6 +100,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow"; public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS"; public final static String SCAN_OFFSET = "_RowOffset"; + public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix"; + public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations @@ -146,6 +149,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString()); throw new DoNotRetryIOException(cause.getMessage(), cause); } + if(isLocalIndex) { + ScanUtil.setupLocalIndexScan(scan, lowerInclusiveRegionKey, upperExclusiveRegionKey); + } } abstract protected boolean isRegionObserverFor(Scan scan); @@ -165,7 +171,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn)); } if (isRegionObserverFor(scan)) { - if (! skipRegionBoundaryCheck(scan)) { + // For local indexes, we need to throw if out of region as we'll get inconsistent + // results otherwise while in other cases, it may just mean out client-side data + // on region boundaries is out of date and can safely be ignored. + if (!skipRegionBoundaryCheck(scan) || ScanUtil.isLocalIndex(scan)) { throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion()); } // Muck with the start/stop row of the scan and set as reversed at the @@ -279,6 +288,31 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final ImmutableBytesWritable ptr) { return new RegionScanner() { + private boolean hasReferences = checkForReferenceFiles(); + private HRegionInfo regionInfo = c.getEnvironment().getRegionInfo(); + private byte[] actualStartKey = getActualStartKey(); + + // If there are any reference files after local index region merge some cases we might + // get the records less than scan start row key. This will happen when we replace the + // actual region start key with merge region start key. This method gives whether are + // there any reference files in the region or not. + private boolean checkForReferenceFiles() { + if(!ScanUtil.isLocalIndex(scan)) return false; + for (byte[] family : scan.getFamilies()) { + if (c.getEnvironment().getRegion().getStore(family).hasReferences()) { + return true; + } + } + return false; + } + + // Get the actual scan start row of local index. This will be used to compare the row + // key of the results less than scan start row when there are references. + public byte[] getActualStartKey() { + return ScanUtil.isLocalIndex(scan) ? ScanUtil.getActualStartRow(scan, regionInfo) + : null; + } + @Override public boolean next(List<Cell> results) throws IOException { try { @@ -337,6 +371,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { arrayElementCell = result.get(arrayElementCellPosition); } if (ScanUtil.isLocalIndex(scan) && !ScanUtil.isAnalyzeTable(scan)) { + if(hasReferences && actualStartKey!=null) { + next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result, + null); + if (result.isEmpty()) { + return next; + } + } IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } @@ -367,7 +408,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); arrayElementCell = result.get(arrayElementCellPosition); } - if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) { + if (ScanUtil.isLocalIndex(scan) && !ScanUtil.isAnalyzeTable(scan)) { + if(hasReferences && actualStartKey!=null) { + next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result, + new Integer(limit)); + if (result.isEmpty()) { + return next; + } + } IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } @@ -386,6 +434,36 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } } + /** + * When there is a merge in progress while scanning local indexes we might get the key values less than scan start row. + * In that case we need to scan until get the row key more or equal to scan start key. + * TODO try to fix this case in LocalIndexStoreFileScanner when there is a merge. + */ + private boolean scanTillScanStartRow(final RegionScanner s, + final Set<KeyValueColumnExpression> arrayKVRefs, + final Expression[] arrayFuncRefs, List<Cell> result, + Integer limit) throws IOException { + boolean next = true; + Cell firstCell = result.get(0); + while (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(), + firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0) { + result.clear(); + if(limit == null) { + next = s.nextRaw(result); + } else { + next = s.nextRaw(result, limit); + } + if (result.isEmpty()) { + return next; + } + if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { + replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); + } + firstCell = result.get(0); + } + return next; + } + private int replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, List<Cell> result) { // make a copy of the results array here, as we're modifying it below
