Revert "PHOENIX-4839 IndexHalfStoreFileReaderGenerator throws NullPointerException(Aman Poonia)"
This reverts commit 501ce1f5ddb01e91e7267f31861b8e5eeb88e1ea. Patch not applicable to HBase-0.98 Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cd7e1210 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cd7e1210 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cd7e1210 Branch: refs/heads/4.14-HBase-0.98 Commit: cd7e12102d7517e288306a1ce9a3488c7afcbc55 Parents: fce9bcf Author: Vincent Poon <vincentp...@apache.org> Authored: Tue Oct 23 11:08:37 2018 -0700 Committer: Vincent Poon <vincentp...@apache.org> Committed: Tue Oct 23 18:38:29 2018 -0700 ---------------------------------------------------------------------- .../regionserver/IndexHalfStoreFileReader.java | 6 - .../IndexHalfStoreFileReaderGenerator.java | 137 +++++++++++++++++-- 2 files changed, 125 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd7e1210/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java index 8bd0d72..d1d12fb 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java @@ -123,10 +123,4 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { public boolean isTop() { return top; } - - @Override - public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) { - return new LocalIndexStoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), true, - getHFileReader().hasMVCCInfo(), readPt); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/cd7e1210/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java index 18d7228..037b299 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; + import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,7 +72,7 @@ import org.apache.phoenix.util.RepairUtil; import com.google.common.collect.Lists; public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { - + private static final String LOCAL_INDEX_AUTOMATIC_REPAIR = "local.index.automatic.repair"; public static final Log LOG = LogFactory.getLog(IndexHalfStoreFileReaderGenerator.class); @@ -148,9 +152,7 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { try { conn = QueryUtil.getConnectionOnServer(ctx.getEnvironment().getConfiguration()).unwrap( PhoenixConnection.class); - PTable dataTable = - IndexUtil.getPDataTable(conn, ctx.getEnvironment().getRegion() - .getTableDesc()); + PTable dataTable = IndexUtil.getPDataTable(conn, ctx.getEnvironment().getRegion().getTableDesc()); List<PTable> indexes = dataTable.getIndexes(); Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers = new HashMap<ImmutableBytesWritable, IndexMaintainer>(); @@ -184,12 +186,19 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { return reader; } + @SuppressWarnings("deprecation") @Override - public InternalScanner preCompact( - ObserverContext<RegionCoprocessorEnvironment> c, Store store, - InternalScanner s, ScanType scanType, - CompactionRequest request) throws IOException { + public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, + Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, + long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException { if (!IndexUtil.isLocalIndexStore(store)) { return s; } + Scan scan = null; + if (s!=null) { + scan = ((StoreScanner)s).scan; + } else { + scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + } if (!store.hasReferences()) { InternalScanner repairScanner = null; if (request.isMajor() && (!RepairUtil.isLocalIndexStoreFilesConsistent(c.getEnvironment(), store))) { @@ -210,7 +219,23 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { return s; } } - return s; + List<StoreFileScanner> newScanners = new ArrayList<StoreFileScanner>(scanners.size()); + boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + for(KeyValueScanner scanner: scanners) { + Reader reader = ((StoreFileScanner) scanner).getReaderForTesting(); + if (reader instanceof IndexHalfStoreFileReader) { + newScanners.add(new LocalIndexStoreFileScanner(reader, reader.getScanner( + scan.getCacheBlocks(), scanUsePread, false), true, reader.getHFileReader() + .hasMVCCInfo(), store.getSmallestReadPoint())); + } else { + newScanners.add(((StoreFileScanner) scanner)); + } + } + if (s!=null) { + s.close(); + } + return new StoreScanner(store, store.getScanInfo(), scan, newScanners, + scanType, store.getSmallestReadPoint(), earliestPutTs); } private byte[][] getViewConstants(PTable dataTable) { @@ -243,10 +268,10 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { } return viewConstants; } - + /** * @param env - * @param store Local Index store + * @param store Local Index store * @param scan * @param scanType * @param earliestPutTs @@ -276,12 +301,100 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { } return new DataTableLocalIndexRegionScanner(env.getRegion().getScanner(scan), env.getRegion(), maintainers, store.getFamily().getName(),env.getConfiguration()); - + } catch (ClassNotFoundException | SQLException e) { throw new IOException(e); } } + + @Override + public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final Scan scan, final NavigableSet<byte[]> targetCols, + final KeyValueScanner s) throws IOException { + if (store.getFamily().getNameAsString() + .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) + && store.hasReferences()) { + final long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel + ()); + if (s!=null) { + s.close(); + } + if (!scan.isReversed()) { + return new StoreScanner(store, store.getScanInfo(), scan, + targetCols, readPt) { + @Override + protected List<KeyValueScanner> getScannersNoCompaction() throws IOException { + if (store.hasReferences()) { + return getLocalIndexScanners(c, store, scan, readPt); + } else { + return super.getScannersNoCompaction(); + } + } + }; + } else { + return new ReversedStoreScanner(store, store.getScanInfo(), scan, + targetCols, readPt) { + @Override + protected List<KeyValueScanner> getScannersNoCompaction() throws IOException { + if (store.hasReferences()) { + return getLocalIndexScanners(c, store, scan, readPt); + } else { + return super.getScannersNoCompaction(); + } + } + }; + } + } + return s; + } + + private List<KeyValueScanner> getLocalIndexScanners(final + ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final Scan scan, final long readPt) throws IOException { + + boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall()); + Collection<StoreFile> storeFiles = store.getStorefiles(); + List<StoreFile> nonReferenceStoreFiles = new ArrayList<>(store.getStorefiles().size()); + List<StoreFile> referenceStoreFiles = new ArrayList<>(store.getStorefiles().size + ()); + final List<KeyValueScanner> keyValueScanners = new ArrayList<>(store + .getStorefiles().size() + 1); + byte[] startKey = c.getEnvironment().getRegionInfo().getStartKey(); + byte[] endKey = c.getEnvironment().getRegionInfo().getEndKey(); + // If the region start key is not the prefix of the scan start row then we can return empty + // scanners. This is possible during merge where one of the child region scan should not return any + // results as we go through merged region. + int prefixLength = + scan.getAttribute(SCAN_START_ROW_SUFFIX) == null ? (startKey.length == 0 ? endKey.length + : startKey.length) : (scan.getStartRow().length - scan.getAttribute(SCAN_START_ROW_SUFFIX).length); + if (Bytes.compareTo(scan.getStartRow(), 0, prefixLength, (startKey.length == 0 ? new byte[endKey.length] : startKey), 0, + startKey.length == 0 ? endKey.length : startKey.length) != 0) { + return keyValueScanners; + } + for (StoreFile storeFile : storeFiles) { + if (storeFile.isReference()) { + referenceStoreFiles.add(storeFile); + } else { + nonReferenceStoreFiles.add(storeFile); + } + } + final List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt); + keyValueScanners.addAll(scanners); + for (StoreFile sf : referenceStoreFiles) { + if (sf.getReader() instanceof IndexHalfStoreFileReader) { + keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader() + .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf + .getReader().getHFileReader().hasMVCCInfo(), readPt)); + } else { + keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader() + .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf + .getReader().getHFileReader().hasMVCCInfo(), readPt)); + } + } + keyValueScanners.addAll(((HStore) store).memstore.getScanners(readPt)); + return keyValueScanners; + } }