PHOENIX-1681 Use the new Region Interface (Andrew Purtell)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/edff624f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/edff624f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/edff624f Branch: refs/heads/json Commit: edff624f193324762fae04907c551e3d2fec93a3 Parents: 7bc9cce Author: Enis Soztutar <e...@apache.org> Authored: Thu May 21 23:22:54 2015 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Fri May 22 00:16:31 2015 -0700 ---------------------------------------------------------------------- ...ReplayWithIndexWritesAndCompressedWALIT.java | 4 +- .../EndToEndCoveredColumnsIndexBuilderIT.java | 4 +- .../IndexHalfStoreFileReaderGenerator.java | 9 +- .../regionserver/IndexSplitTransaction.java | 65 +++++--------- .../hbase/regionserver/LocalIndexMerger.java | 16 ++-- .../hbase/regionserver/LocalIndexSplitter.java | 11 +-- .../coprocessor/BaseScannerRegionObserver.java | 26 +++--- .../GroupedAggregateRegionObserver.java | 13 +-- .../coprocessor/MetaDataEndpointImpl.java | 94 ++++++++++---------- .../phoenix/coprocessor/ScanRegionObserver.java | 17 ++-- .../coprocessor/SequenceRegionObserver.java | 16 ++-- .../UngroupedAggregateRegionObserver.java | 29 +++--- .../hbase/index/covered/data/LocalTable.java | 5 +- .../write/ParallelWriterIndexCommitter.java | 8 +- .../recovery/PerRegionIndexWriteCache.java | 10 +-- .../recovery/StoreFailuresInCachePolicy.java | 4 +- .../TrackingParallelWriterIndexCommitter.java | 8 +- .../phoenix/index/PhoenixIndexBuilder.java | 4 +- .../apache/phoenix/index/PhoenixIndexCodec.java | 14 ++- .../schema/stats/StatisticsCollector.java | 14 +-- .../phoenix/schema/stats/StatisticsScanner.java | 16 ++-- .../phoenix/schema/stats/StatisticsWriter.java | 16 ++-- .../java/org/apache/phoenix/util/IndexUtil.java | 38 ++++---- .../index/covered/TestLocalTableState.java | 8 +- .../index/write/TestWALRecoveryCaching.java | 17 ++-- .../recovery/TestPerRegionIndexWriteCache.java | 6 +- 26 files changed, 230 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java index 3b8ff29..611ba68 100644 --- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java +++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java @@ -159,7 +159,7 @@ public class WALReplayWithIndexWritesAndCompressedWALIT { } /** - * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify + * Test writing edits into an region, closing it, splitting logs, opening Region again. Verify * seqids. * @throws Exception on failure */ @@ -183,7 +183,7 @@ public class WALReplayWithIndexWritesAndCompressedWALIT { builder.build(htd); // create the region + its WAL - HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); + HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); // FIXME: Uses private type region0.close(); region0.getWAL().close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index d90733f..6b2309e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.util.EnvironmentEdge; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -312,7 +312,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT { HTable primary = new HTable(UTIL.getConfiguration(), tableNameBytes); // overwrite the codec so we can verify the current state - HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0); + Region region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0); Indexer indexer = (Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName()); CoveredColumnsIndexBuilder builder = http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java index 1284dcf..94d5912 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java @@ -76,7 +76,7 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, Reference r, Reader reader) throws IOException { TableName tableName = ctx.getEnvironment().getRegion().getTableDesc().getTableName(); - HRegion region = ctx.getEnvironment().getRegion(); + Region region = ctx.getEnvironment().getRegion(); HRegionInfo childRegion = region.getRegionInfo(); byte[] splitKey = null; if (reader == null && r != null) { @@ -109,7 +109,7 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaTableAccessor.getRegionsFromMergeQualifier(ctx.getEnvironment() .getRegionServerServices().getConnection(), - region.getRegionName()); + region.getRegionInfo().getRegionName()); if (mergeRegions == null || mergeRegions.getFirst() == null) return reader; byte[] splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(r.getSplitKey())); @@ -121,8 +121,9 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { childRegion = mergeRegions.getSecond(); regionStartKeyInHFile = mergeRegions.getSecond().getStartKey(); } - splitKey = KeyValue.createFirstOnRow(region.getStartKey().length == 0 ? - new byte[region.getEndKey().length] : region.getStartKey()).getKey(); + splitKey = KeyValue.createFirstOnRow(region.getRegionInfo().getStartKey().length == 0 ? + new byte[region.getRegionInfo().getEndKey().length] : + region.getRegionInfo().getStartKey()).getKey(); } else { HRegionInfo parentRegion = HRegionInfo.getHRegionInfo(result); regionStartKeyInHFile = http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java index 3057a14..71bc520 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java @@ -65,31 +65,8 @@ import org.apache.zookeeper.data.Stat; import com.google.common.util.concurrent.ThreadFactoryBuilder; -/** - * Executes region split as a "transaction". Call {@link #prepare()} to setup - * the transaction, {@link #execute(Server, RegionServerServices)} to run the - * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails. - * - * <p>Here is an example of how you would use this class: - * <pre> - * SplitTransaction st = new SplitTransaction(this.conf, parent, midKey) - * if (!st.prepare()) return; - * try { - * st.execute(server, services); - * } catch (IOException ioe) { - * try { - * st.rollback(server, services); - * return; - * } catch (RuntimeException e) { - * myAbortable.abort("Failed split, abort"); - * } - * } - * </Pre> - * <p>This class is not thread safe. Caller needs ensure split is run by - * one thread only. - */ @InterfaceAudience.Private -public class IndexSplitTransaction extends SplitTransaction { +public class IndexSplitTransaction extends SplitTransactionImpl { // FIXME: Extends private type private static final Log LOG = LogFactory.getLog(IndexSplitTransaction.class); /* @@ -154,9 +131,9 @@ public class IndexSplitTransaction extends SplitTransaction { * @param r Region to split * @param splitrow Row to split around */ - public IndexSplitTransaction(final HRegion r, final byte [] splitrow) { + public IndexSplitTransaction(final Region r, final byte [] splitrow) { super(r , splitrow); - this.parent = r; + this.parent = (HRegion)r; this.splitrow = splitrow; } @@ -217,7 +194,7 @@ public class IndexSplitTransaction extends SplitTransaction { * @return Regions created */ @Override - /* package */PairOfSameType<HRegion> createDaughters(final Server server, + /* package */PairOfSameType<Region> createDaughters(final Server server, final RegionServerServices services) throws IOException { LOG.info("Starting split of region " + this.parent); if ((server != null && server.isStopped()) || @@ -244,14 +221,14 @@ public class IndexSplitTransaction extends SplitTransaction { server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout); - PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing); + PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing); List<Mutation> metaEntries = new ArrayList<Mutation>(); if (this.parent.getCoprocessorHost() != null) { if (this.parent.getCoprocessorHost(). preSplitBeforePONR(this.splitrow, metaEntries)) { throw new IOException("Coprocessor bypassing region " - + this.parent.getRegionNameAsString() + " split."); + + this.parent.getRegionInfo().getRegionNameAsString() + " split."); } try { for (Mutation p : metaEntries) { @@ -303,7 +280,7 @@ public class IndexSplitTransaction extends SplitTransaction { } @Override - public PairOfSameType<HRegion> stepsBeforePONR(final Server server, + public PairOfSameType<Region> stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing) throws IOException { // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't // have zookeeper so don't do zk stuff if server or zookeeper is null @@ -313,7 +290,7 @@ public class IndexSplitTransaction extends SplitTransaction { parent.getRegionInfo(), server.getServerName(), hri_a, hri_b); } catch (KeeperException e) { throw new IOException("Failed creating PENDING_SPLIT znode on " + - this.parent.getRegionNameAsString(), e); + this.parent.getRegionInfo().getRegionNameAsString(), e); } } this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); @@ -367,12 +344,12 @@ public class IndexSplitTransaction extends SplitTransaction { // stuff in fs that needs cleanup -- a storefile or two. Thats why we // add entry to journal BEFORE rather than AFTER the change. this.journal.add(JournalEntry.STARTED_REGION_A_CREATION); - HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a); + Region a = this.parent.createDaughterRegionFromSplits(this.hri_a); // Ditto this.journal.add(JournalEntry.STARTED_REGION_B_CREATION); - HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b); - return new PairOfSameType<HRegion>(a, b); + Region b = this.parent.createDaughterRegionFromSplits(this.hri_b); + return new PairOfSameType<Region>(a, b); } /** @@ -387,7 +364,7 @@ public class IndexSplitTransaction extends SplitTransaction { */ @Override /* package */void openDaughters(final Server server, - final RegionServerServices services, HRegion a, HRegion b) + final RegionServerServices services, Region a, Region b) throws IOException { boolean stopped = server != null && server.isStopped(); boolean stopping = services != null && services.isStopping(); @@ -400,8 +377,8 @@ public class IndexSplitTransaction extends SplitTransaction { " because stopping=" + stopping + ", stopped=" + stopped); } else { // Open daughters in parallel. - DaughterOpener aOpener = new DaughterOpener(server, a); - DaughterOpener bOpener = new DaughterOpener(server, b); + DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a); + DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b); aOpener.start(); bOpener.start(); try { @@ -444,7 +421,7 @@ public class IndexSplitTransaction extends SplitTransaction { * Call {@link #rollback(Server, RegionServerServices)} */ /* package */void transitionZKNode(final Server server, - final RegionServerServices services, HRegion a, HRegion b) + final RegionServerServices services, Region a, Region b) throws IOException { // Tell master about split by updating zk. If we fail, abort. if (server != null && server.getZooKeeper() != null) { @@ -556,7 +533,7 @@ public class IndexSplitTransaction extends SplitTransaction { Thread.currentThread().interrupt(); } throw new IOException("Failed getting SPLITTING znode on " - + parent.getRegionNameAsString(), e); + + parent.getRegionInfo().getRegionNameAsString(), e); } } @@ -572,10 +549,10 @@ public class IndexSplitTransaction extends SplitTransaction { * @see #rollback(Server, RegionServerServices) */ @Override - public PairOfSameType<HRegion> execute(final Server server, + public PairOfSameType<Region> execute(final Server server, final RegionServerServices services) throws IOException { - PairOfSameType<HRegion> regions = createDaughters(server, services); + PairOfSameType<Region> regions = createDaughters(server, services); if (this.parent.getCoprocessorHost() != null) { this.parent.getCoprocessorHost().preSplitAfterPONR(); } @@ -583,8 +560,8 @@ public class IndexSplitTransaction extends SplitTransaction { } @Override - public PairOfSameType<HRegion> stepsAfterPONR(final Server server, - final RegionServerServices services, PairOfSameType<HRegion> regions) + public PairOfSameType<Region> stepsAfterPONR(final Server server, + final RegionServerServices services, PairOfSameType<Region> regions) throws IOException { openDaughters(server, services, regions.getFirst(), regions.getSecond()); transitionZKNode(server, services, regions.getFirst(), regions.getSecond()); @@ -871,7 +848,7 @@ public class IndexSplitTransaction extends SplitTransaction { this.parent.initialize(); } catch (IOException e) { LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " + - this.parent.getRegionNameAsString(), e); + this.parent.getRegionInfo().getRegionNameAsString(), e); throw new RuntimeException(e); } break; http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java index add9b72..e361343 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java @@ -38,12 +38,12 @@ public class LocalIndexMerger extends BaseRegionServerObserver { private static final Log LOG = LogFactory.getLog(LocalIndexMerger.class); - private RegionMergeTransaction rmt = null; - private HRegion mergedRegion = null; + private RegionMergeTransactionImpl rmt = null; // FIXME: Use of private type + private HRegion mergedRegion = null; // FIXME: Use of private type @Override public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx, - HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException { + Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { HTableDescriptor tableDesc = regionA.getTableDesc(); if (SchemaUtil.isSystemTable(tableDesc.getName())) { return; @@ -56,14 +56,14 @@ public class LocalIndexMerger extends BaseRegionServerObserver { TableName indexTable = TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName())); if (!MetaTableAccessor.tableExists(rs.getConnection(), indexTable)) return; - HRegion indexRegionA = IndexUtil.getIndexRegion(regionA, ctx.getEnvironment()); + Region indexRegionA = IndexUtil.getIndexRegion(regionA, ctx.getEnvironment()); if (indexRegionA == null) { LOG.warn("Index region corresponindg to data region " + regionA + " not in the same server. So skipping the merge."); ctx.bypass(); return; } - HRegion indexRegionB = IndexUtil.getIndexRegion(regionB, ctx.getEnvironment()); + Region indexRegionB = IndexUtil.getIndexRegion(regionB, ctx.getEnvironment()); if (indexRegionB == null) { LOG.warn("Index region corresponindg to region " + regionB + " not in the same server. So skipping the merge."); @@ -71,7 +71,7 @@ public class LocalIndexMerger extends BaseRegionServerObserver { return; } try { - rmt = new RegionMergeTransaction(indexRegionA, indexRegionB, false); + rmt = new RegionMergeTransactionImpl(indexRegionA, indexRegionB, false); if (!rmt.prepare(rss)) { LOG.error("Prepare for the index regions merge [" + indexRegionA + "," + indexRegionB + "] failed. So returning null. "); @@ -97,7 +97,7 @@ public class LocalIndexMerger extends BaseRegionServerObserver { @Override public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx, - HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException { + Region regionA, Region regionB, Region mergedRegion) throws IOException { if (rmt != null && this.mergedRegion != null) { RegionServerCoprocessorEnvironment environment = ctx.getEnvironment(); HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); @@ -107,7 +107,7 @@ public class LocalIndexMerger extends BaseRegionServerObserver { @Override public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, - HRegion regionA, HRegion regionB) throws IOException { + Region regionA, Region regionB) throws IOException { HRegionServer rs = (HRegionServer) ctx.getEnvironment().getRegionServerServices(); try { if (rmt != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java index 9af8251..7882e25 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java @@ -52,8 +52,8 @@ public class LocalIndexSplitter extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(LocalIndexSplitter.class); - private SplitTransaction st = null; - private PairOfSameType<HRegion> daughterRegions = null; + private SplitTransactionImpl st = null; // FIXME: Uses private type + private PairOfSameType<Region> daughterRegions = null; private static final ParseNodeFactory FACTORY = new ParseNodeFactory(); private static final int SPLIT_TXN_MINIMUM_SUPPORTED_VERSION = VersionUtil .encodeVersion("0.98.9"); @@ -74,17 +74,18 @@ public class LocalIndexSplitter extends BaseRegionObserver { TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName())); if (!MetaTableAccessor.tableExists(rss.getConnection(), indexTable)) return; - HRegion indexRegion = IndexUtil.getIndexRegion(environment); + Region indexRegion = IndexUtil.getIndexRegion(environment); if (indexRegion == null) { LOG.warn("Index region corresponindg to data region " + environment.getRegion() + " not in the same server. So skipping the split."); ctx.bypass(); return; } + // FIXME: Uses private type try { int encodedVersion = VersionUtil.encodeVersion(environment.getHBaseVersion()); if(encodedVersion >= SPLIT_TXN_MINIMUM_SUPPORTED_VERSION) { - st = new SplitTransaction(indexRegion, splitKey); + st = new SplitTransactionImpl(indexRegion, splitKey); st.useZKForAssignment = environment.getConfiguration().getBoolean("hbase.assignment.usezk", true); @@ -98,7 +99,7 @@ public class LocalIndexSplitter extends BaseRegionObserver { ctx.bypass(); return; } - indexRegion.forceSplit(splitKey); + ((HRegion)indexRegion).forceSplit(splitKey); daughterRegions = st.stepsBeforePONR(rss, rss, false); HRegionInfo copyOfParent = new HRegionInfo(indexRegion.getRegionInfo()); copyOfParent.setOffline(true); http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index fc74968..d9e64e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; @@ -114,12 +114,12 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } - private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException { + private static void throwIfScanOutOfRegion(Scan scan, Region region) throws DoNotRetryIOException { boolean isLocalIndex = ScanUtil.isLocalIndex(scan); byte[] lowerInclusiveScanKey = scan.getStartRow(); byte[] upperExclusiveScanKey = scan.getStopRow(); - byte[] lowerInclusiveRegionKey = region.getStartKey(); - byte[] upperExclusiveRegionKey = region.getEndKey(); + byte[] lowerInclusiveRegionKey = region.getRegionInfo().getStartKey(); + byte[] upperExclusiveRegionKey = region.getRegionInfo().getEndKey(); boolean isStaleRegionBoundaries; if (isLocalIndex) { byte[] expectedUpperRegionKey = scan.getAttribute(EXPECTED_UPPER_REGION_KEY); @@ -201,7 +201,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } } } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t); return null; // impossible } } @@ -221,7 +221,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, - final HRegion dataRegion, final IndexMaintainer indexMaintainer, + final Region dataRegion, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final TupleProjector projector, final ImmutableBytesWritable ptr) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, @@ -246,7 +246,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, - final HRegion dataRegion, final IndexMaintainer indexMaintainer, + final Region dataRegion, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, final ImmutableBytesWritable ptr) { @@ -257,7 +257,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { try { return s.next(results); } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t); return false; // impossible } } @@ -267,7 +267,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { try { return s.next(result, scannerContext); } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t); return false; // impossible } } @@ -319,7 +319,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // There is a scanattribute set to retrieve the specific array element return next; } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t); return false; // impossible } } @@ -346,10 +346,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } // There is a scanattribute set to retrieve the specific array element return next; - } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t); return false; // impossible - } + } } private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs, http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 19a1663..d613688 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; @@ -112,8 +112,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { * For local indexes, we need to set an offset on row key expressions to skip * the region start key. */ - HRegion region = c.getEnvironment().getRegion(); - offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length; + Region region = c.getEnvironment().getRegion(); + offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length : + region.getRegionInfo().getEndKey().length; ScanUtil.setRowKeyOffset(scan, offset); } @@ -128,7 +129,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); TupleProjector tupleProjector = null; - HRegion dataRegion = null; + Region dataRegion = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); @@ -415,7 +416,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan))); } - HRegion region = c.getEnvironment().getRegion(); + Region region = c.getEnvironment().getRegion(); region.startRegionOperation(); try { synchronized (scanner) { @@ -495,7 +496,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // If we're calculating no aggregate functions, we can exit at the // start of a new row. Otherwise, we have to wait until an agg int countOffset = rowAggregators.length == 0 ? 1 : 0; - HRegion region = c.getEnvironment().getRegion(); + Region region = c.getEnvironment().getRegion(); region.startRegionOperation(); try { synchronized (scanner) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index e613007..39a4956 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -99,8 +99,8 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -404,7 +404,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso try { // TODO: check that key is within region.getStartKey() and region.getEndKey() // and return special code to force client to lookup region from meta. - HRegion region = env.getRegion(); + Region region = env.getRegion(); MetaDataMutationResult result = checkTableKeyInRegion(key, region); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); @@ -434,7 +434,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, + private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp) throws IOException, SQLException { Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); RegionScanner scanner = region.getScanner(scan); @@ -464,7 +464,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private List<PFunction> buildFunctions(List<byte[]> keys, HRegion region, + private List<PFunction> buildFunctions(List<byte[]> keys, Region region, long clientTimeStamp) throws IOException, SQLException { List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size()); for (byte[] key : keys) { @@ -914,7 +914,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp); } - private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, + private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp) throws IOException { if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; @@ -942,7 +942,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } - private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, + private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp) throws IOException { if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; @@ -989,7 +989,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException { - HRegion region = env.getRegion(); + Region region = env.getRegion(); Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); // We always cache the latest version - fault in if not in cache @@ -1008,7 +1008,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException { - HRegion region = env.getRegion(); + Region region = env.getRegion(); Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); PFunction function = (PFunction)metaDataCache.getIfPresent(cacheKey); // We always cache the latest version - fault in if not in cache @@ -1051,7 +1051,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso schemaName, tableName); byte[] parentKey = parentTableName == null ? null : lockKey; - HRegion region = env.getRegion(); + Region region = env.getRegion(); MetaDataMutationResult result = checkTableKeyInRegion(lockKey, region); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); @@ -1115,7 +1115,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return; } } - // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the + // TODO: Switch this to Region#batchMutate when we want to support indexes on the // system // table. Basically, we get all the locks that we don't already hold for all the // tableMetadata rows. This ensures we don't have deadlock situations (ensuring @@ -1125,7 +1125,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // on the system table. This is an issue because of the way we manage batch mutation // in the // Indexer. - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); + region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE); // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache @@ -1151,9 +1152,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } - private static void acquireLock(HRegion region, byte[] key, List<RowLock> locks) + private static void acquireLock(Region region, byte[] key, List<RowLock> locks) throws IOException { - RowLock rowLock = region.getRowLock(key); + RowLock rowLock = region.getRowLock(key, true); if (rowLock == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); } @@ -1167,7 +1168,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso * TODO: should we pass a timestamp here? */ @SuppressWarnings("deprecation") - private TableViewFinderResult findChildViews(HRegion region, byte[] tenantId, PTable table) throws IOException { + private TableViewFinderResult findChildViews(Region region, byte[] tenantId, PTable table) throws IOException { byte[] schemaName = table.getSchemaName().getBytes(); byte[] tableName = table.getTableName().getBytes(); boolean isMultiTenant = table.isMultiTenant(); @@ -1256,7 +1257,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName); - HRegion region = env.getRegion(); + Region region = env.getRegion(); MetaDataMutationResult result = checkTableKeyInRegion(key, region); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); @@ -1280,7 +1281,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); // Commit the list of deletion. - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); + region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE); long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata); for (ImmutableBytesPtr ckey : invalidateList) { metaDataCache.put(ckey, newDeletedTableMarker(currentTime)); @@ -1309,7 +1311,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete); - HRegion region = env.getRegion(); + Region region = env.getRegion(); ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); @@ -1435,7 +1437,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static interface ColumnMutator { MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, - List<Mutation> tableMetadata, HRegion region, + List<Mutation> tableMetadata, Region region, List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws IOException, SQLException; } @@ -1449,7 +1451,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; try { byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName); - HRegion region = env.getRegion(); + Region region = env.getRegion(); MetaDataMutationResult result = checkTableKeyInRegion(key, region); if (result != null) { return result; @@ -1535,7 +1537,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return result; } - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); + region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE); // Invalidate from cache for (ImmutableBytesPtr invalidateKey : invalidateList) { metaDataCache.invalidate(invalidateKey); @@ -1563,7 +1566,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() { @Override public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, - List<Mutation> tableMetaData, HRegion region, + List<Mutation> tableMetaData, Region region, List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) { byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX]; byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX]; @@ -1647,14 +1650,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // get the co-processor environment // TODO: check that key is within region.getStartKey() and region.getEndKey() // and return special code to force client to lookup region from meta. - HRegion region = env.getRegion(); + Region region = env.getRegion(); /* * Lock directly on key, though it may be an index table. This will just prevent a table * from getting rebuilt too often. */ final boolean wasLocked = (rowLock != null); if (!wasLocked) { - rowLock = region.getRowLock(key); + rowLock = region.getRowLock(key, true); if (rowLock == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); } @@ -1689,7 +1692,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException { Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); - HRegion region = env.getRegion(); + Region region = env.getRegion(); Collections.sort(keys, new Comparator<byte[]>() { @Override public int compare(byte[] o1, byte[] o2) { @@ -1700,11 +1703,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso * Lock directly on key, though it may be an index table. This will just prevent a table * from getting rebuilt too often. */ - List<RowLock> rowLocks = new ArrayList<HRegion.RowLock>(keys.size());; + List<RowLock> rowLocks = new ArrayList<Region.RowLock>(keys.size());; try { - rowLocks = new ArrayList<HRegion.RowLock>(keys.size()); + rowLocks = new ArrayList<Region.RowLock>(keys.size()); for (int i = 0; i < keys.size(); i++) { - HRegion.RowLock rowLock = region.getRowLock(keys.get(i)); + Region.RowLock rowLock = region.getRowLock(keys.get(i), true); if (rowLock == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(keys.get(i))); @@ -1737,7 +1740,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if(functionsAvailable.size() == numFunctions) return functionsAvailable; return null; } finally { - for (HRegion.RowLock lock : rowLocks) { + for (Region.RowLock lock : rowLocks) { lock.release(); } rowLocks.clear(); @@ -1756,7 +1759,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() { @Override public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, - List<Mutation> tableMetaData, HRegion region, + List<Mutation> tableMetaData, Region region, List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws IOException, SQLException { byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX]; @@ -1904,7 +1907,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName); - HRegion region = env.getRegion(); + Region region = env.getRegion(); MetaDataMutationResult result = checkTableKeyInRegion(key, region); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); @@ -1928,7 +1931,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } PIndexState newState = PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]); - RowLock rowLock = region.getRowLock(key); + RowLock rowLock = region.getRowLock(key, true); if (rowLock == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); } @@ -2019,7 +2022,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY); tableMetadata.add(p); } - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); + region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE); // Invalidate from cache Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); metaDataCache.invalidate(cacheKey); @@ -2044,9 +2048,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) { - byte[] startKey = region.getStartKey(); - byte[] endKey = region.getEndKey(); + private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, Region region) { + byte[] startKey = region.getRegionInfo().getStartKey(); + byte[] endKey = region.getRegionInfo().getEndKey(); if (Bytes.compareTo(startKey, key) <= 0 && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key, endKey) < 0)) { @@ -2056,9 +2060,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso EnvironmentEdgeManager.currentTimeMillis(), null); } - private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, HRegion region) { - byte[] startKey = region.getStartKey(); - byte[] endKey = region.getEndKey(); + private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, Region region) { + byte[] startKey = region.getRegionInfo().getStartKey(); + byte[] endKey = region.getRegionInfo().getEndKey(); if (Bytes.compareTo(startKey, key) <= 0 && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key, endKey) < 0)) { @@ -2135,7 +2139,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] tenantId = request.getTenantId().toByteArray(); List<String> functionNames = new ArrayList<>(request.getFunctionNamesCount()); try { - HRegion region = env.getRegion(); + Region region = env.getRegion(); List<ByteString> functionNamesList = request.getFunctionNamesList(); List<Long> functionTimestampsList = request.getFunctionTimestampsList(); List<byte[]> keys = new ArrayList<byte[]>(request.getFunctionNamesCount()); @@ -2189,7 +2193,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName); - HRegion region = env.getRegion(); + Region region = env.getRegion(); MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); @@ -2225,7 +2229,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } // Don't store function info for temporary functions. if(!temporaryFunction) { - region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet()); + region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); } // Invalidate the cache - the next getFunction call will add it @@ -2259,7 +2263,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName); - HRegion region = env.getRegion(); + Region region = env.getRegion(); MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region); if (result != null) { done.run(MetaDataMutationResult.toProto(result)); @@ -2278,7 +2282,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } - region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet()); + region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData); @@ -2322,7 +2326,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso EnvironmentEdgeManager.currentTimeMillis(), null); } invalidateList.add(new FunctionBytesPtr(keys.get(0))); - HRegion region = env.getRegion(); + Region region = env.getRegion(); Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp); List<Cell> results = Lists.newArrayList(); try (RegionScanner scanner = region.getScanner(scan);) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 77e124d..54c688a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; @@ -176,8 +176,9 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { * For local indexes, we need to set an offset on row key expressions to skip * the region start key. */ - HRegion region = c.getEnvironment().getRegion(); - offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length; + Region region = c.getEnvironment().getRegion(); + offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length : + region.getRegionInfo().getEndKey().length; ScanUtil.setRowKeyOffset(scan, offset); } @@ -187,7 +188,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan( scan, innerScanner, arrayKVRefs); TupleProjector tupleProjector = null; - HRegion dataRegion = null; + Region dataRegion = null; IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); @@ -231,7 +232,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), tenantId); long estSize = iterator.getEstimatedByteSize(); final MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize); - final HRegion region = c.getEnvironment().getRegion(); + final Region region = c.getEnvironment().getRegion(); region.startRegionOperation(); try { // Once we return from the first call to next, we've run through and cached @@ -241,7 +242,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { long actualSize = iterator.getByteSize(); chunk.resize(actualSize); } catch (Throwable t) { - ServerUtil.throwIOException(region.getRegionNameAsString(), t); + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); return null; } finally { region.closeRegionOperation(); @@ -273,7 +274,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { tuple = iterator.next(); return !isFilterDone(); } catch (Throwable t) { - ServerUtil.throwIOException(region.getRegionNameAsString(), t); + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); return false; } } @@ -288,7 +289,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { iterator.close(); } } catch (SQLException e) { - ServerUtil.throwIOException(region.getRegionNameAsString(), e); + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e); } finally { chunk.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index 7953933..9b5f040 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.exception.SQLExceptionCode; @@ -88,9 +88,9 @@ public class SequenceRegionObserver extends BaseRegionObserver { QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf))); } - private static void acquireLock(HRegion region, byte[] key, List<RowLock> locks) + private static void acquireLock(Region region, byte[] key, List<RowLock> locks) throws IOException { - RowLock rowLock = region.getRowLock(key); + RowLock rowLock = region.getRowLock(key, true); if (rowLock == null) { throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); } @@ -114,7 +114,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { // We need to set this to prevent region.increment from being called e.bypass(); e.complete(); - HRegion region = env.getRegion(); + Region region = env.getRegion(); byte[] row = increment.getRow(); List<RowLock> locks = Lists.newArrayList(); TimeRange tr = increment.getTimeRange(); @@ -251,7 +251,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { } // update the KeyValues on the server Mutation[] mutations = new Mutation[]{put}; - region.batchMutate(mutations); + region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); // return a Result with the updated KeyValues return Result.create(cells); } finally { @@ -345,7 +345,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { // We need to set this to prevent region.append from being called e.bypass(); e.complete(); - HRegion region = env.getRegion(); + Region region = env.getRegion(); byte[] row = append.getRow(); List<RowLock> locks = Lists.newArrayList(); region.startRegionOperation(); @@ -400,7 +400,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { } } Mutation[] mutations = new Mutation[]{m}; - region.batchMutate(mutations); + region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); long serverTimestamp = MetaDataUtil.getClientTimeStamp(m); // Return result with single KeyValue. The only piece of information // the client cares about is the timestamp, which is the timestamp of http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 2d6d98a..d5cc486 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -48,8 +48,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -125,7 +125,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ this.kvBuilder = GenericKeyValueBuilder.INSTANCE; } - private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException { + private static void commitBatch(Region region, List<Mutation> mutations, byte[] indexUUID) throws IOException { if (indexUUID != null) { for (Mutation m : mutations) { m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); @@ -133,7 +133,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } Mutation[] mutationArray = new Mutation[mutations.size()]; // TODO: should we use the one that is all or none? - region.batchMutate(mutations.toArray(mutationArray)); + region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } public static void serializeIntoScan(Scan scan) { @@ -158,7 +158,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { int offset = 0; - HRegion region = c.getEnvironment().getRegion(); + Region region = c.getEnvironment().getRegion(); long ts = scan.getTimeRange().getMax(); StatisticsCollector stats = null; if(ScanUtil.isAnalyzeTable(scan)) { @@ -172,7 +172,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ * For local indexes, we need to set an offset on row key expressions to skip * the region start key. */ - offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length; + offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length : + region.getRegionInfo().getEndKey().length; ScanUtil.setRowKeyOffset(scan, offset); } @@ -212,7 +213,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ ptr = new ImmutableBytesWritable(); } TupleProjector tupleProjector = null; - HRegion dataRegion = null; + Region dataRegion = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); boolean localIndexScan = ScanUtil.isLocalIndex(scan); @@ -279,8 +280,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ results); Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, - c.getEnvironment().getRegion().getStartKey(), - c.getEnvironment().getRegion().getEndKey()); + c.getEnvironment().getRegion().getRegionInfo().getStartKey(), + c.getEnvironment().getRegion().getRegionInfo().getEndKey()); indexMutations.add(put); } } @@ -391,7 +392,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } catch (ConstraintViolationException e) { // Log and ignore in count logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + - region.getRegionNameAsString() + " with values " + + region.getRegionInfo().getRegionNameAsString() + " with values " + SchemaUtil.toString(values), ScanUtil.getCustomAnnotations(scan)), e); continue; @@ -479,9 +480,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } private void commitIndexMutations(final ObserverContext<RegionCoprocessorEnvironment> c, - HRegion region, List<Mutation> indexMutations) throws IOException { + Region region, List<Mutation> indexMutations) throws IOException { // Get indexRegion corresponding to data region - HRegion indexRegion = IndexUtil.getIndexRegion(c.getEnvironment()); + Region indexRegion = IndexUtil.getIndexRegion(c.getEnvironment()); if (indexRegion != null) { commitBatch(indexRegion, indexMutations, null); } else { @@ -493,7 +494,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ table = c.getEnvironment().getTable(indexTable); table.batch(indexMutations); } catch (InterruptedException ie) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), ie); } finally { if (table != null) table.close(); @@ -534,9 +535,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ @Override - public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) + public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, Region l, Region r) throws IOException { - HRegion region = e.getEnvironment().getRegion(); + Region region = e.getEnvironment().getRegion(); TableName table = region.getRegionInfo().getTable(); StatisticsCollector stats = null; try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java index 71cc1d6..549fe8c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java @@ -24,12 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -60,7 +59,7 @@ public class LocalTable implements LocalHBaseState { Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns)); s.setStartRow(row); s.setStopRow(row); - HRegion region = this.env.getRegion(); + Region region = this.env.getRegion(); RegionScanner scanner = region.getScanner(s); List<Cell> kvs = new ArrayList<Cell>(1); boolean more = scanner.next(kvs); http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index f72dec0..56bf637 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -21,11 +21,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure; import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner; @@ -150,10 +151,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { // as well. try { if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) { - HRegion indexRegion = IndexUtil.getIndexRegion(env); + Region indexRegion = IndexUtil.getIndexRegion(env); if (indexRegion != null) { throwFailureIfDone(); - indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()])); + indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]), + HConstants.NO_NONCE, HConstants.NO_NONCE); return null; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java index 4d5f667..26da2d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/PerRegionIndexWriteCache.java @@ -22,7 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; @@ -32,8 +32,8 @@ import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; public class PerRegionIndexWriteCache { - private Map<HRegion, Multimap<HTableInterfaceReference, Mutation>> cache = - new HashMap<HRegion, Multimap<HTableInterfaceReference, Mutation>>(); + private Map<Region, Multimap<HTableInterfaceReference, Mutation>> cache = + new HashMap<Region, Multimap<HTableInterfaceReference, Mutation>>(); /** @@ -43,7 +43,7 @@ public class PerRegionIndexWriteCache { * @return Get the edits for the given region. Returns <tt>null</tt> if there are no pending edits * for the region */ - public Multimap<HTableInterfaceReference, Mutation> getEdits(HRegion region) { + public Multimap<HTableInterfaceReference, Mutation> getEdits(Region region) { return cache.remove(region); } @@ -52,7 +52,7 @@ public class PerRegionIndexWriteCache { * @param table * @param collection */ - public void addEdits(HRegion region, HTableInterfaceReference table, + public void addEdits(Region region, HTableInterfaceReference table, Collection<Mutation> collection) { Multimap<HTableInterfaceReference, Mutation> edits = cache.get(region); if (edits == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java index f36affb..189f970 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import com.google.common.collect.Multimap; import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; @@ -41,7 +41,7 @@ public class StoreFailuresInCachePolicy implements IndexFailurePolicy { private KillServerOnFailurePolicy delegate; private PerRegionIndexWriteCache cache; - private HRegion region; + private Region region; /** * @param failedIndexEdits cache to update when we find a failure http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java index 9171b53..b1b2656 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java @@ -23,11 +23,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.phoenix.hbase.index.CapturingAbortable; import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; @@ -154,10 +155,11 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { // index is pretty hacky. If we're going to keep this, we should revisit that // as well. if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) { - HRegion indexRegion = IndexUtil.getIndexRegion(env); + Region indexRegion = IndexUtil.getIndexRegion(env); if (indexRegion != null) { throwFailureIfDone(); - indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()])); + indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]), + HConstants.NO_NONCE, HConstants.NO_NONCE); return Boolean.TRUE; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index b5e6a63..7a45e21 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -28,8 +28,8 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; @@ -73,7 +73,7 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); scanRanges.initializeScan(scan); scan.setFilter(scanRanges.getSkipScanFilter()); - HRegion region = this.env.getRegion(); + Region region = this.env.getRegion(); RegionScanner scanner = region.getScanner(scan); // Run through the scanner using internal nextRaw method region.startRegionOperation(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/edff624f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 99e26d1..222aefb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -24,9 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; @@ -166,14 +164,14 @@ public class PhoenixIndexCodec extends BaseIndexCodec { Mutation mutation = null; if (upsert) { mutation = - maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state - .getCurrentTimestamp(), env.getRegion().getStartKey(), env - .getRegion().getEndKey()); + maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), + env.getRegion().getRegionInfo().getStartKey(), + env.getRegion().getRegionInfo().getEndKey()); } else { mutation = - maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state - .getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion() - .getStartKey(), env.getRegion().getEndKey()); + maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(), + state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), + env.getRegion().getRegionInfo().getEndKey()); } indexUpdate.setUpdate(mutation); if (scanner != null) {