This is an automated email from the ASF dual-hosted git repository. vincentpoon pushed a commit to branch 4.x-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push: new a705c22 PHOENIX-5137 check region close before commiting a batch for index rebuild a705c22 is described below commit a705c22082200f86a428c22e875b119d7c188157 Author: Kiran Kumar Maturi <maturi.ki...@gmail.com> AuthorDate: Fri Feb 22 09:45:13 2019 +0530 PHOENIX-5137 check region close before commiting a batch for index rebuild --- .../UngroupedAggregateRegionObserver.java | 30 +++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index cab13f1..5923a75 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -263,7 +263,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return; } - Mutation[] mutationArray = new Mutation[mutations.size()]; + Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the // flush happen which decrease the memstore size and then writes allowed on the region. for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { @@ -374,6 +374,17 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver super.clear(); } } + + private long getBlockingMemstoreSize(Region region, Configuration conf) { + long flushSize = region.getTableDesc().getMemStoreFlushSize(); + + if (flushSize <= 0) { + flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); + } + return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1); + } @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException { @@ -499,12 +510,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver MutationList mutations = new MutationList(); boolean needToWrite = false; Configuration conf = env.getConfiguration(); - long flushSize = region.getTableDesc().getMemStoreFlushSize(); - - if (flushSize <= 0) { - flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, - HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); - } /** * Slow down the writes if the memstore size more than @@ -512,9 +517,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * bytes. This avoids flush storm to hdfs for cases like index building where reads and * write happen to all the table regions in the server. */ - final long blockingMemStoreSize = flushSize * ( - conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, - HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; + final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ; boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if(buildLocalIndex) { @@ -1060,6 +1063,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); + final long blockingMemstoreSize = getBlockingMemstoreSize(region, config); MutationList mutations = new MutationList(maxBatchSize); region.startRegionOperation(); byte[] uuidValue = ServerCacheClient.generateId(); @@ -1101,7 +1105,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - commitBatchWithRetries(region, mutations, -1); + checkForRegionClosingOrSplitting(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); uuidValue = ServerCacheClient.generateId(); mutations.clear(); } @@ -1110,7 +1115,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } while (hasMore); if (!mutations.isEmpty()) { - commitBatchWithRetries(region, mutations, -1); + checkForRegionClosingOrSplitting(); + commitBatchWithRetries(region, mutations, blockingMemstoreSize); } } } catch (IOException e) {