http://git-wip-us.apache.org/repos/asf/hbase-site/blob/99c53df1/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.html index f6a717e..a701ece 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.html @@ -7930,329 +7930,342 @@ <span class="sourceLineNo">7922</span> case DELETE:<a name="line.7922"></a> <span class="sourceLineNo">7923</span> case BATCH_MUTATE:<a name="line.7923"></a> <span class="sourceLineNo">7924</span> case COMPACT_REGION:<a name="line.7924"></a> -<span class="sourceLineNo">7925</span> // when a region is in recovering state, no read, split or merge is allowed<a name="line.7925"></a> -<span class="sourceLineNo">7926</span> if (isRecovering() && (this.disallowWritesInRecovering ||<a name="line.7926"></a> -<span class="sourceLineNo">7927</span> (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {<a name="line.7927"></a> -<span class="sourceLineNo">7928</span> throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +<a name="line.7928"></a> -<span class="sourceLineNo">7929</span> " is recovering; cannot take reads");<a name="line.7929"></a> -<span class="sourceLineNo">7930</span> }<a name="line.7930"></a> -<span class="sourceLineNo">7931</span> break;<a name="line.7931"></a> -<span class="sourceLineNo">7932</span> default:<a name="line.7932"></a> -<span class="sourceLineNo">7933</span> break;<a name="line.7933"></a> -<span class="sourceLineNo">7934</span> }<a name="line.7934"></a> -<span class="sourceLineNo">7935</span> if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION<a name="line.7935"></a> -<span class="sourceLineNo">7936</span> || op == Operation.COMPACT_REGION) {<a name="line.7936"></a> -<span class="sourceLineNo">7937</span> // split, merge or compact region doesn't need to check the closing/closed state or lock the<a name="line.7937"></a> -<span class="sourceLineNo">7938</span> // region<a name="line.7938"></a> -<span class="sourceLineNo">7939</span> return;<a name="line.7939"></a> -<span class="sourceLineNo">7940</span> }<a name="line.7940"></a> -<span class="sourceLineNo">7941</span> if (this.closing.get()) {<a name="line.7941"></a> -<span class="sourceLineNo">7942</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");<a name="line.7942"></a> -<span class="sourceLineNo">7943</span> }<a name="line.7943"></a> -<span class="sourceLineNo">7944</span> lock(lock.readLock());<a name="line.7944"></a> -<span class="sourceLineNo">7945</span> if (this.closed.get()) {<a name="line.7945"></a> -<span class="sourceLineNo">7946</span> lock.readLock().unlock();<a name="line.7946"></a> -<span class="sourceLineNo">7947</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");<a name="line.7947"></a> -<span class="sourceLineNo">7948</span> }<a name="line.7948"></a> -<span class="sourceLineNo">7949</span> try {<a name="line.7949"></a> -<span class="sourceLineNo">7950</span> if (coprocessorHost != null) {<a name="line.7950"></a> -<span class="sourceLineNo">7951</span> coprocessorHost.postStartRegionOperation(op);<a name="line.7951"></a> -<span class="sourceLineNo">7952</span> }<a name="line.7952"></a> -<span class="sourceLineNo">7953</span> } catch (Exception e) {<a name="line.7953"></a> -<span class="sourceLineNo">7954</span> lock.readLock().unlock();<a name="line.7954"></a> -<span class="sourceLineNo">7955</span> throw new IOException(e);<a name="line.7955"></a> -<span class="sourceLineNo">7956</span> }<a name="line.7956"></a> -<span class="sourceLineNo">7957</span> }<a name="line.7957"></a> -<span class="sourceLineNo">7958</span><a name="line.7958"></a> -<span class="sourceLineNo">7959</span> @Override<a name="line.7959"></a> -<span class="sourceLineNo">7960</span> public void closeRegionOperation() throws IOException {<a name="line.7960"></a> -<span class="sourceLineNo">7961</span> closeRegionOperation(Operation.ANY);<a name="line.7961"></a> -<span class="sourceLineNo">7962</span> }<a name="line.7962"></a> -<span class="sourceLineNo">7963</span><a name="line.7963"></a> -<span class="sourceLineNo">7964</span> /**<a name="line.7964"></a> -<span class="sourceLineNo">7965</span> * Closes the lock. This needs to be called in the finally block corresponding<a name="line.7965"></a> -<span class="sourceLineNo">7966</span> * to the try block of {@link #startRegionOperation(Operation)}<a name="line.7966"></a> -<span class="sourceLineNo">7967</span> * @throws IOException<a name="line.7967"></a> -<span class="sourceLineNo">7968</span> */<a name="line.7968"></a> -<span class="sourceLineNo">7969</span> public void closeRegionOperation(Operation operation) throws IOException {<a name="line.7969"></a> -<span class="sourceLineNo">7970</span> lock.readLock().unlock();<a name="line.7970"></a> -<span class="sourceLineNo">7971</span> if (coprocessorHost != null) {<a name="line.7971"></a> -<span class="sourceLineNo">7972</span> coprocessorHost.postCloseRegionOperation(operation);<a name="line.7972"></a> -<span class="sourceLineNo">7973</span> }<a name="line.7973"></a> -<span class="sourceLineNo">7974</span> }<a name="line.7974"></a> -<span class="sourceLineNo">7975</span><a name="line.7975"></a> -<span class="sourceLineNo">7976</span> /**<a name="line.7976"></a> -<span class="sourceLineNo">7977</span> * This method needs to be called before any public call that reads or<a name="line.7977"></a> -<span class="sourceLineNo">7978</span> * modifies stores in bulk. It has to be called just before a try.<a name="line.7978"></a> -<span class="sourceLineNo">7979</span> * #closeBulkRegionOperation needs to be called in the try's finally block<a name="line.7979"></a> -<span class="sourceLineNo">7980</span> * Acquires a writelock and checks if the region is closing or closed.<a name="line.7980"></a> -<span class="sourceLineNo">7981</span> * @throws NotServingRegionException when the region is closing or closed<a name="line.7981"></a> -<span class="sourceLineNo">7982</span> * @throws RegionTooBusyException if failed to get the lock in time<a name="line.7982"></a> -<span class="sourceLineNo">7983</span> * @throws InterruptedIOException if interrupted while waiting for a lock<a name="line.7983"></a> -<span class="sourceLineNo">7984</span> */<a name="line.7984"></a> -<span class="sourceLineNo">7985</span> private void startBulkRegionOperation(boolean writeLockNeeded)<a name="line.7985"></a> -<span class="sourceLineNo">7986</span> throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {<a name="line.7986"></a> -<span class="sourceLineNo">7987</span> if (this.closing.get()) {<a name="line.7987"></a> -<span class="sourceLineNo">7988</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");<a name="line.7988"></a> -<span class="sourceLineNo">7989</span> }<a name="line.7989"></a> -<span class="sourceLineNo">7990</span> if (writeLockNeeded) lock(lock.writeLock());<a name="line.7990"></a> -<span class="sourceLineNo">7991</span> else lock(lock.readLock());<a name="line.7991"></a> -<span class="sourceLineNo">7992</span> if (this.closed.get()) {<a name="line.7992"></a> -<span class="sourceLineNo">7993</span> if (writeLockNeeded) lock.writeLock().unlock();<a name="line.7993"></a> -<span class="sourceLineNo">7994</span> else lock.readLock().unlock();<a name="line.7994"></a> -<span class="sourceLineNo">7995</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");<a name="line.7995"></a> -<span class="sourceLineNo">7996</span> }<a name="line.7996"></a> -<span class="sourceLineNo">7997</span> }<a name="line.7997"></a> -<span class="sourceLineNo">7998</span><a name="line.7998"></a> -<span class="sourceLineNo">7999</span> /**<a name="line.7999"></a> -<span class="sourceLineNo">8000</span> * Closes the lock. This needs to be called in the finally block corresponding<a name="line.8000"></a> -<span class="sourceLineNo">8001</span> * to the try block of #startRegionOperation<a name="line.8001"></a> -<span class="sourceLineNo">8002</span> */<a name="line.8002"></a> -<span class="sourceLineNo">8003</span> private void closeBulkRegionOperation(){<a name="line.8003"></a> -<span class="sourceLineNo">8004</span> if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();<a name="line.8004"></a> -<span class="sourceLineNo">8005</span> else lock.readLock().unlock();<a name="line.8005"></a> -<span class="sourceLineNo">8006</span> }<a name="line.8006"></a> -<span class="sourceLineNo">8007</span><a name="line.8007"></a> -<span class="sourceLineNo">8008</span> /**<a name="line.8008"></a> -<span class="sourceLineNo">8009</span> * Update LongAdders for number of puts without wal and the size of possible data loss.<a name="line.8009"></a> -<span class="sourceLineNo">8010</span> * These information are exposed by the region server metrics.<a name="line.8010"></a> -<span class="sourceLineNo">8011</span> */<a name="line.8011"></a> -<span class="sourceLineNo">8012</span> private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {<a name="line.8012"></a> -<span class="sourceLineNo">8013</span> numMutationsWithoutWAL.increment();<a name="line.8013"></a> -<span class="sourceLineNo">8014</span> if (numMutationsWithoutWAL.sum() <= 1) {<a name="line.8014"></a> -<span class="sourceLineNo">8015</span> LOG.info("writing data to region " + this +<a name="line.8015"></a> -<span class="sourceLineNo">8016</span> " with WAL disabled. Data may be lost in the event of a crash.");<a name="line.8016"></a> -<span class="sourceLineNo">8017</span> }<a name="line.8017"></a> -<span class="sourceLineNo">8018</span><a name="line.8018"></a> -<span class="sourceLineNo">8019</span> long mutationSize = 0;<a name="line.8019"></a> -<span class="sourceLineNo">8020</span> for (List<Cell> cells: familyMap.values()) {<a name="line.8020"></a> -<span class="sourceLineNo">8021</span> assert cells instanceof RandomAccess;<a name="line.8021"></a> -<span class="sourceLineNo">8022</span> int listSize = cells.size();<a name="line.8022"></a> -<span class="sourceLineNo">8023</span> for (int i=0; i < listSize; i++) {<a name="line.8023"></a> -<span class="sourceLineNo">8024</span> Cell cell = cells.get(i);<a name="line.8024"></a> -<span class="sourceLineNo">8025</span> mutationSize += KeyValueUtil.length(cell);<a name="line.8025"></a> -<span class="sourceLineNo">8026</span> }<a name="line.8026"></a> -<span class="sourceLineNo">8027</span> }<a name="line.8027"></a> -<span class="sourceLineNo">8028</span><a name="line.8028"></a> -<span class="sourceLineNo">8029</span> dataInMemoryWithoutWAL.add(mutationSize);<a name="line.8029"></a> -<span class="sourceLineNo">8030</span> }<a name="line.8030"></a> +<span class="sourceLineNo">7925</span> case SNAPSHOT:<a name="line.7925"></a> +<span class="sourceLineNo">7926</span> // when a region is in recovering state, no read, split, merge or snapshot is allowed<a name="line.7926"></a> +<span class="sourceLineNo">7927</span> if (isRecovering() && (this.disallowWritesInRecovering ||<a name="line.7927"></a> +<span class="sourceLineNo">7928</span> (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {<a name="line.7928"></a> +<span class="sourceLineNo">7929</span> throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +<a name="line.7929"></a> +<span class="sourceLineNo">7930</span> " is recovering; cannot take reads");<a name="line.7930"></a> +<span class="sourceLineNo">7931</span> }<a name="line.7931"></a> +<span class="sourceLineNo">7932</span> break;<a name="line.7932"></a> +<span class="sourceLineNo">7933</span> default:<a name="line.7933"></a> +<span class="sourceLineNo">7934</span> break;<a name="line.7934"></a> +<span class="sourceLineNo">7935</span> }<a name="line.7935"></a> +<span class="sourceLineNo">7936</span> if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION<a name="line.7936"></a> +<span class="sourceLineNo">7937</span> || op == Operation.COMPACT_REGION) {<a name="line.7937"></a> +<span class="sourceLineNo">7938</span> // split, merge or compact region doesn't need to check the closing/closed state or lock the<a name="line.7938"></a> +<span class="sourceLineNo">7939</span> // region<a name="line.7939"></a> +<span class="sourceLineNo">7940</span> return;<a name="line.7940"></a> +<span class="sourceLineNo">7941</span> }<a name="line.7941"></a> +<span class="sourceLineNo">7942</span> if (this.closing.get()) {<a name="line.7942"></a> +<span class="sourceLineNo">7943</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");<a name="line.7943"></a> +<span class="sourceLineNo">7944</span> }<a name="line.7944"></a> +<span class="sourceLineNo">7945</span> lock(lock.readLock());<a name="line.7945"></a> +<span class="sourceLineNo">7946</span> if (this.closed.get()) {<a name="line.7946"></a> +<span class="sourceLineNo">7947</span> lock.readLock().unlock();<a name="line.7947"></a> +<span class="sourceLineNo">7948</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");<a name="line.7948"></a> +<span class="sourceLineNo">7949</span> }<a name="line.7949"></a> +<span class="sourceLineNo">7950</span> // The unit for snapshot is a region. So, all stores for this region must be<a name="line.7950"></a> +<span class="sourceLineNo">7951</span> // prepared for snapshot operation before proceeding.<a name="line.7951"></a> +<span class="sourceLineNo">7952</span> if (op == Operation.SNAPSHOT) {<a name="line.7952"></a> +<span class="sourceLineNo">7953</span> for (Store store : stores.values()) {<a name="line.7953"></a> +<span class="sourceLineNo">7954</span> if (store instanceof HStore) {<a name="line.7954"></a> +<span class="sourceLineNo">7955</span> ((HStore)store).preSnapshotOperation();<a name="line.7955"></a> +<span class="sourceLineNo">7956</span> }<a name="line.7956"></a> +<span class="sourceLineNo">7957</span> }<a name="line.7957"></a> +<span class="sourceLineNo">7958</span> }<a name="line.7958"></a> +<span class="sourceLineNo">7959</span> try {<a name="line.7959"></a> +<span class="sourceLineNo">7960</span> if (coprocessorHost != null) {<a name="line.7960"></a> +<span class="sourceLineNo">7961</span> coprocessorHost.postStartRegionOperation(op);<a name="line.7961"></a> +<span class="sourceLineNo">7962</span> }<a name="line.7962"></a> +<span class="sourceLineNo">7963</span> } catch (Exception e) {<a name="line.7963"></a> +<span class="sourceLineNo">7964</span> lock.readLock().unlock();<a name="line.7964"></a> +<span class="sourceLineNo">7965</span> throw new IOException(e);<a name="line.7965"></a> +<span class="sourceLineNo">7966</span> }<a name="line.7966"></a> +<span class="sourceLineNo">7967</span> }<a name="line.7967"></a> +<span class="sourceLineNo">7968</span><a name="line.7968"></a> +<span class="sourceLineNo">7969</span> @Override<a name="line.7969"></a> +<span class="sourceLineNo">7970</span> public void closeRegionOperation() throws IOException {<a name="line.7970"></a> +<span class="sourceLineNo">7971</span> closeRegionOperation(Operation.ANY);<a name="line.7971"></a> +<span class="sourceLineNo">7972</span> }<a name="line.7972"></a> +<span class="sourceLineNo">7973</span><a name="line.7973"></a> +<span class="sourceLineNo">7974</span> @Override<a name="line.7974"></a> +<span class="sourceLineNo">7975</span> public void closeRegionOperation(Operation operation) throws IOException {<a name="line.7975"></a> +<span class="sourceLineNo">7976</span> if (operation == Operation.SNAPSHOT) {<a name="line.7976"></a> +<span class="sourceLineNo">7977</span> for (Store store: stores.values()) {<a name="line.7977"></a> +<span class="sourceLineNo">7978</span> if (store instanceof HStore) {<a name="line.7978"></a> +<span class="sourceLineNo">7979</span> ((HStore)store).postSnapshotOperation();<a name="line.7979"></a> +<span class="sourceLineNo">7980</span> }<a name="line.7980"></a> +<span class="sourceLineNo">7981</span> }<a name="line.7981"></a> +<span class="sourceLineNo">7982</span> }<a name="line.7982"></a> +<span class="sourceLineNo">7983</span> lock.readLock().unlock();<a name="line.7983"></a> +<span class="sourceLineNo">7984</span> if (coprocessorHost != null) {<a name="line.7984"></a> +<span class="sourceLineNo">7985</span> coprocessorHost.postCloseRegionOperation(operation);<a name="line.7985"></a> +<span class="sourceLineNo">7986</span> }<a name="line.7986"></a> +<span class="sourceLineNo">7987</span> }<a name="line.7987"></a> +<span class="sourceLineNo">7988</span><a name="line.7988"></a> +<span class="sourceLineNo">7989</span> /**<a name="line.7989"></a> +<span class="sourceLineNo">7990</span> * This method needs to be called before any public call that reads or<a name="line.7990"></a> +<span class="sourceLineNo">7991</span> * modifies stores in bulk. It has to be called just before a try.<a name="line.7991"></a> +<span class="sourceLineNo">7992</span> * #closeBulkRegionOperation needs to be called in the try's finally block<a name="line.7992"></a> +<span class="sourceLineNo">7993</span> * Acquires a writelock and checks if the region is closing or closed.<a name="line.7993"></a> +<span class="sourceLineNo">7994</span> * @throws NotServingRegionException when the region is closing or closed<a name="line.7994"></a> +<span class="sourceLineNo">7995</span> * @throws RegionTooBusyException if failed to get the lock in time<a name="line.7995"></a> +<span class="sourceLineNo">7996</span> * @throws InterruptedIOException if interrupted while waiting for a lock<a name="line.7996"></a> +<span class="sourceLineNo">7997</span> */<a name="line.7997"></a> +<span class="sourceLineNo">7998</span> private void startBulkRegionOperation(boolean writeLockNeeded)<a name="line.7998"></a> +<span class="sourceLineNo">7999</span> throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {<a name="line.7999"></a> +<span class="sourceLineNo">8000</span> if (this.closing.get()) {<a name="line.8000"></a> +<span class="sourceLineNo">8001</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");<a name="line.8001"></a> +<span class="sourceLineNo">8002</span> }<a name="line.8002"></a> +<span class="sourceLineNo">8003</span> if (writeLockNeeded) lock(lock.writeLock());<a name="line.8003"></a> +<span class="sourceLineNo">8004</span> else lock(lock.readLock());<a name="line.8004"></a> +<span class="sourceLineNo">8005</span> if (this.closed.get()) {<a name="line.8005"></a> +<span class="sourceLineNo">8006</span> if (writeLockNeeded) lock.writeLock().unlock();<a name="line.8006"></a> +<span class="sourceLineNo">8007</span> else lock.readLock().unlock();<a name="line.8007"></a> +<span class="sourceLineNo">8008</span> throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");<a name="line.8008"></a> +<span class="sourceLineNo">8009</span> }<a name="line.8009"></a> +<span class="sourceLineNo">8010</span> }<a name="line.8010"></a> +<span class="sourceLineNo">8011</span><a name="line.8011"></a> +<span class="sourceLineNo">8012</span> /**<a name="line.8012"></a> +<span class="sourceLineNo">8013</span> * Closes the lock. This needs to be called in the finally block corresponding<a name="line.8013"></a> +<span class="sourceLineNo">8014</span> * to the try block of #startRegionOperation<a name="line.8014"></a> +<span class="sourceLineNo">8015</span> */<a name="line.8015"></a> +<span class="sourceLineNo">8016</span> private void closeBulkRegionOperation(){<a name="line.8016"></a> +<span class="sourceLineNo">8017</span> if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();<a name="line.8017"></a> +<span class="sourceLineNo">8018</span> else lock.readLock().unlock();<a name="line.8018"></a> +<span class="sourceLineNo">8019</span> }<a name="line.8019"></a> +<span class="sourceLineNo">8020</span><a name="line.8020"></a> +<span class="sourceLineNo">8021</span> /**<a name="line.8021"></a> +<span class="sourceLineNo">8022</span> * Update LongAdders for number of puts without wal and the size of possible data loss.<a name="line.8022"></a> +<span class="sourceLineNo">8023</span> * These information are exposed by the region server metrics.<a name="line.8023"></a> +<span class="sourceLineNo">8024</span> */<a name="line.8024"></a> +<span class="sourceLineNo">8025</span> private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {<a name="line.8025"></a> +<span class="sourceLineNo">8026</span> numMutationsWithoutWAL.increment();<a name="line.8026"></a> +<span class="sourceLineNo">8027</span> if (numMutationsWithoutWAL.sum() <= 1) {<a name="line.8027"></a> +<span class="sourceLineNo">8028</span> LOG.info("writing data to region " + this +<a name="line.8028"></a> +<span class="sourceLineNo">8029</span> " with WAL disabled. Data may be lost in the event of a crash.");<a name="line.8029"></a> +<span class="sourceLineNo">8030</span> }<a name="line.8030"></a> <span class="sourceLineNo">8031</span><a name="line.8031"></a> -<span class="sourceLineNo">8032</span> private void lock(final Lock lock)<a name="line.8032"></a> -<span class="sourceLineNo">8033</span> throws RegionTooBusyException, InterruptedIOException {<a name="line.8033"></a> -<span class="sourceLineNo">8034</span> lock(lock, 1);<a name="line.8034"></a> -<span class="sourceLineNo">8035</span> }<a name="line.8035"></a> -<span class="sourceLineNo">8036</span><a name="line.8036"></a> -<span class="sourceLineNo">8037</span> /**<a name="line.8037"></a> -<span class="sourceLineNo">8038</span> * Try to acquire a lock. Throw RegionTooBusyException<a name="line.8038"></a> -<span class="sourceLineNo">8039</span> * if failed to get the lock in time. Throw InterruptedIOException<a name="line.8039"></a> -<span class="sourceLineNo">8040</span> * if interrupted while waiting for the lock.<a name="line.8040"></a> -<span class="sourceLineNo">8041</span> */<a name="line.8041"></a> -<span class="sourceLineNo">8042</span> private void lock(final Lock lock, final int multiplier)<a name="line.8042"></a> -<span class="sourceLineNo">8043</span> throws RegionTooBusyException, InterruptedIOException {<a name="line.8043"></a> -<span class="sourceLineNo">8044</span> try {<a name="line.8044"></a> -<span class="sourceLineNo">8045</span> final long waitTime = Math.min(maxBusyWaitDuration,<a name="line.8045"></a> -<span class="sourceLineNo">8046</span> busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));<a name="line.8046"></a> -<span class="sourceLineNo">8047</span> if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {<a name="line.8047"></a> -<span class="sourceLineNo">8048</span> throw new RegionTooBusyException(<a name="line.8048"></a> -<span class="sourceLineNo">8049</span> "failed to get a lock in " + waitTime + " ms. " +<a name="line.8049"></a> -<span class="sourceLineNo">8050</span> "regionName=" + (this.getRegionInfo() == null ? "unknown" :<a name="line.8050"></a> -<span class="sourceLineNo">8051</span> this.getRegionInfo().getRegionNameAsString()) +<a name="line.8051"></a> -<span class="sourceLineNo">8052</span> ", server=" + (this.getRegionServerServices() == null ? "unknown" :<a name="line.8052"></a> -<span class="sourceLineNo">8053</span> this.getRegionServerServices().getServerName()));<a name="line.8053"></a> -<span class="sourceLineNo">8054</span> }<a name="line.8054"></a> -<span class="sourceLineNo">8055</span> } catch (InterruptedException ie) {<a name="line.8055"></a> -<span class="sourceLineNo">8056</span> LOG.info("Interrupted while waiting for a lock");<a name="line.8056"></a> -<span class="sourceLineNo">8057</span> InterruptedIOException iie = new InterruptedIOException();<a name="line.8057"></a> -<span class="sourceLineNo">8058</span> iie.initCause(ie);<a name="line.8058"></a> -<span class="sourceLineNo">8059</span> throw iie;<a name="line.8059"></a> -<span class="sourceLineNo">8060</span> }<a name="line.8060"></a> -<span class="sourceLineNo">8061</span> }<a name="line.8061"></a> -<span class="sourceLineNo">8062</span><a name="line.8062"></a> -<span class="sourceLineNo">8063</span> /**<a name="line.8063"></a> -<span class="sourceLineNo">8064</span> * Calls sync with the given transaction ID<a name="line.8064"></a> -<span class="sourceLineNo">8065</span> * @param txid should sync up to which transaction<a name="line.8065"></a> -<span class="sourceLineNo">8066</span> * @throws IOException If anything goes wrong with DFS<a name="line.8066"></a> -<span class="sourceLineNo">8067</span> */<a name="line.8067"></a> -<span class="sourceLineNo">8068</span> private void sync(long txid, Durability durability) throws IOException {<a name="line.8068"></a> -<span class="sourceLineNo">8069</span> if (this.getRegionInfo().isMetaRegion()) {<a name="line.8069"></a> -<span class="sourceLineNo">8070</span> this.wal.sync(txid);<a name="line.8070"></a> -<span class="sourceLineNo">8071</span> } else {<a name="line.8071"></a> -<span class="sourceLineNo">8072</span> switch(durability) {<a name="line.8072"></a> -<span class="sourceLineNo">8073</span> case USE_DEFAULT:<a name="line.8073"></a> -<span class="sourceLineNo">8074</span> // do what table defaults to<a name="line.8074"></a> -<span class="sourceLineNo">8075</span> if (shouldSyncWAL()) {<a name="line.8075"></a> -<span class="sourceLineNo">8076</span> this.wal.sync(txid);<a name="line.8076"></a> -<span class="sourceLineNo">8077</span> }<a name="line.8077"></a> -<span class="sourceLineNo">8078</span> break;<a name="line.8078"></a> -<span class="sourceLineNo">8079</span> case SKIP_WAL:<a name="line.8079"></a> -<span class="sourceLineNo">8080</span> // nothing do to<a name="line.8080"></a> -<span class="sourceLineNo">8081</span> break;<a name="line.8081"></a> -<span class="sourceLineNo">8082</span> case ASYNC_WAL:<a name="line.8082"></a> -<span class="sourceLineNo">8083</span> // nothing do to<a name="line.8083"></a> -<span class="sourceLineNo">8084</span> break;<a name="line.8084"></a> -<span class="sourceLineNo">8085</span> case SYNC_WAL:<a name="line.8085"></a> -<span class="sourceLineNo">8086</span> case FSYNC_WAL:<a name="line.8086"></a> -<span class="sourceLineNo">8087</span> // sync the WAL edit (SYNC and FSYNC treated the same for now)<a name="line.8087"></a> -<span class="sourceLineNo">8088</span> this.wal.sync(txid);<a name="line.8088"></a> -<span class="sourceLineNo">8089</span> break;<a name="line.8089"></a> -<span class="sourceLineNo">8090</span> default:<a name="line.8090"></a> -<span class="sourceLineNo">8091</span> throw new RuntimeException("Unknown durability " + durability);<a name="line.8091"></a> -<span class="sourceLineNo">8092</span> }<a name="line.8092"></a> -<span class="sourceLineNo">8093</span> }<a name="line.8093"></a> -<span class="sourceLineNo">8094</span> }<a name="line.8094"></a> -<span class="sourceLineNo">8095</span><a name="line.8095"></a> -<span class="sourceLineNo">8096</span> /**<a name="line.8096"></a> -<span class="sourceLineNo">8097</span> * Check whether we should sync the wal from the table's durability settings<a name="line.8097"></a> -<span class="sourceLineNo">8098</span> */<a name="line.8098"></a> -<span class="sourceLineNo">8099</span> private boolean shouldSyncWAL() {<a name="line.8099"></a> -<span class="sourceLineNo">8100</span> return durability.ordinal() > Durability.ASYNC_WAL.ordinal();<a name="line.8100"></a> -<span class="sourceLineNo">8101</span> }<a name="line.8101"></a> -<span class="sourceLineNo">8102</span><a name="line.8102"></a> -<span class="sourceLineNo">8103</span> /**<a name="line.8103"></a> -<span class="sourceLineNo">8104</span> * A mocked list implementation - discards all updates.<a name="line.8104"></a> -<span class="sourceLineNo">8105</span> */<a name="line.8105"></a> -<span class="sourceLineNo">8106</span> private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {<a name="line.8106"></a> -<span class="sourceLineNo">8107</span><a name="line.8107"></a> -<span class="sourceLineNo">8108</span> @Override<a name="line.8108"></a> -<span class="sourceLineNo">8109</span> public void add(int index, Cell element) {<a name="line.8109"></a> -<span class="sourceLineNo">8110</span> // do nothing<a name="line.8110"></a> -<span class="sourceLineNo">8111</span> }<a name="line.8111"></a> -<span class="sourceLineNo">8112</span><a name="line.8112"></a> -<span class="sourceLineNo">8113</span> @Override<a name="line.8113"></a> -<span class="sourceLineNo">8114</span> public boolean addAll(int index, Collection<? extends Cell> c) {<a name="line.8114"></a> -<span class="sourceLineNo">8115</span> return false; // this list is never changed as a result of an update<a name="line.8115"></a> -<span class="sourceLineNo">8116</span> }<a name="line.8116"></a> -<span class="sourceLineNo">8117</span><a name="line.8117"></a> -<span class="sourceLineNo">8118</span> @Override<a name="line.8118"></a> -<span class="sourceLineNo">8119</span> public KeyValue get(int index) {<a name="line.8119"></a> -<span class="sourceLineNo">8120</span> throw new UnsupportedOperationException();<a name="line.8120"></a> -<span class="sourceLineNo">8121</span> }<a name="line.8121"></a> -<span class="sourceLineNo">8122</span><a name="line.8122"></a> -<span class="sourceLineNo">8123</span> @Override<a name="line.8123"></a> -<span class="sourceLineNo">8124</span> public int size() {<a name="line.8124"></a> -<span class="sourceLineNo">8125</span> return 0;<a name="line.8125"></a> -<span class="sourceLineNo">8126</span> }<a name="line.8126"></a> -<span class="sourceLineNo">8127</span> };<a name="line.8127"></a> -<span class="sourceLineNo">8128</span><a name="line.8128"></a> -<span class="sourceLineNo">8129</span> @Override<a name="line.8129"></a> -<span class="sourceLineNo">8130</span> public long getOpenSeqNum() {<a name="line.8130"></a> -<span class="sourceLineNo">8131</span> return this.openSeqNum;<a name="line.8131"></a> -<span class="sourceLineNo">8132</span> }<a name="line.8132"></a> -<span class="sourceLineNo">8133</span><a name="line.8133"></a> -<span class="sourceLineNo">8134</span> @Override<a name="line.8134"></a> -<span class="sourceLineNo">8135</span> public Map<byte[], Long> getMaxStoreSeqId() {<a name="line.8135"></a> -<span class="sourceLineNo">8136</span> return this.maxSeqIdInStores;<a name="line.8136"></a> -<span class="sourceLineNo">8137</span> }<a name="line.8137"></a> -<span class="sourceLineNo">8138</span><a name="line.8138"></a> -<span class="sourceLineNo">8139</span> @Override<a name="line.8139"></a> -<span class="sourceLineNo">8140</span> public long getOldestSeqIdOfStore(byte[] familyName) {<a name="line.8140"></a> -<span class="sourceLineNo">8141</span> return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);<a name="line.8141"></a> -<span class="sourceLineNo">8142</span> }<a name="line.8142"></a> -<span class="sourceLineNo">8143</span><a name="line.8143"></a> -<span class="sourceLineNo">8144</span> @Override<a name="line.8144"></a> -<span class="sourceLineNo">8145</span> public CompactionState getCompactionState() {<a name="line.8145"></a> -<span class="sourceLineNo">8146</span> boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;<a name="line.8146"></a> -<span class="sourceLineNo">8147</span> return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)<a name="line.8147"></a> -<span class="sourceLineNo">8148</span> : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));<a name="line.8148"></a> -<span class="sourceLineNo">8149</span> }<a name="line.8149"></a> -<span class="sourceLineNo">8150</span><a name="line.8150"></a> -<span class="sourceLineNo">8151</span> public void reportCompactionRequestStart(boolean isMajor){<a name="line.8151"></a> -<span class="sourceLineNo">8152</span> (isMajor ? majorInProgress : minorInProgress).incrementAndGet();<a name="line.8152"></a> -<span class="sourceLineNo">8153</span> }<a name="line.8153"></a> -<span class="sourceLineNo">8154</span><a name="line.8154"></a> -<span class="sourceLineNo">8155</span> public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {<a name="line.8155"></a> -<span class="sourceLineNo">8156</span> int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();<a name="line.8156"></a> -<span class="sourceLineNo">8157</span><a name="line.8157"></a> -<span class="sourceLineNo">8158</span> // metrics<a name="line.8158"></a> -<span class="sourceLineNo">8159</span> compactionsFinished.incrementAndGet();<a name="line.8159"></a> -<span class="sourceLineNo">8160</span> compactionNumFilesCompacted.addAndGet(numFiles);<a name="line.8160"></a> -<span class="sourceLineNo">8161</span> compactionNumBytesCompacted.addAndGet(filesSizeCompacted);<a name="line.8161"></a> -<span class="sourceLineNo">8162</span><a name="line.8162"></a> -<span class="sourceLineNo">8163</span> assert newValue >= 0;<a name="line.8163"></a> -<span class="sourceLineNo">8164</span> }<a name="line.8164"></a> -<span class="sourceLineNo">8165</span><a name="line.8165"></a> -<span class="sourceLineNo">8166</span> public void reportCompactionRequestFailure() {<a name="line.8166"></a> -<span class="sourceLineNo">8167</span> compactionsFailed.incrementAndGet();<a name="line.8167"></a> -<span class="sourceLineNo">8168</span> }<a name="line.8168"></a> -<span class="sourceLineNo">8169</span><a name="line.8169"></a> -<span class="sourceLineNo">8170</span> public void incrementCompactionsQueuedCount() {<a name="line.8170"></a> -<span class="sourceLineNo">8171</span> compactionsQueued.incrementAndGet();<a name="line.8171"></a> -<span class="sourceLineNo">8172</span> }<a name="line.8172"></a> -<span class="sourceLineNo">8173</span><a name="line.8173"></a> -<span class="sourceLineNo">8174</span> public void decrementCompactionsQueuedCount() {<a name="line.8174"></a> -<span class="sourceLineNo">8175</span> compactionsQueued.decrementAndGet();<a name="line.8175"></a> -<span class="sourceLineNo">8176</span> }<a name="line.8176"></a> -<span class="sourceLineNo">8177</span><a name="line.8177"></a> -<span class="sourceLineNo">8178</span> public void incrementFlushesQueuedCount() {<a name="line.8178"></a> -<span class="sourceLineNo">8179</span> flushesQueued.incrementAndGet();<a name="line.8179"></a> -<span class="sourceLineNo">8180</span> }<a name="line.8180"></a> -<span class="sourceLineNo">8181</span><a name="line.8181"></a> -<span class="sourceLineNo">8182</span> @VisibleForTesting<a name="line.8182"></a> -<span class="sourceLineNo">8183</span> public long getReadPoint() {<a name="line.8183"></a> -<span class="sourceLineNo">8184</span> return getReadPoint(IsolationLevel.READ_COMMITTED);<a name="line.8184"></a> +<span class="sourceLineNo">8032</span> long mutationSize = 0;<a name="line.8032"></a> +<span class="sourceLineNo">8033</span> for (List<Cell> cells: familyMap.values()) {<a name="line.8033"></a> +<span class="sourceLineNo">8034</span> assert cells instanceof RandomAccess;<a name="line.8034"></a> +<span class="sourceLineNo">8035</span> int listSize = cells.size();<a name="line.8035"></a> +<span class="sourceLineNo">8036</span> for (int i=0; i < listSize; i++) {<a name="line.8036"></a> +<span class="sourceLineNo">8037</span> Cell cell = cells.get(i);<a name="line.8037"></a> +<span class="sourceLineNo">8038</span> mutationSize += KeyValueUtil.length(cell);<a name="line.8038"></a> +<span class="sourceLineNo">8039</span> }<a name="line.8039"></a> +<span class="sourceLineNo">8040</span> }<a name="line.8040"></a> +<span class="sourceLineNo">8041</span><a name="line.8041"></a> +<span class="sourceLineNo">8042</span> dataInMemoryWithoutWAL.add(mutationSize);<a name="line.8042"></a> +<span class="sourceLineNo">8043</span> }<a name="line.8043"></a> +<span class="sourceLineNo">8044</span><a name="line.8044"></a> +<span class="sourceLineNo">8045</span> private void lock(final Lock lock)<a name="line.8045"></a> +<span class="sourceLineNo">8046</span> throws RegionTooBusyException, InterruptedIOException {<a name="line.8046"></a> +<span class="sourceLineNo">8047</span> lock(lock, 1);<a name="line.8047"></a> +<span class="sourceLineNo">8048</span> }<a name="line.8048"></a> +<span class="sourceLineNo">8049</span><a name="line.8049"></a> +<span class="sourceLineNo">8050</span> /**<a name="line.8050"></a> +<span class="sourceLineNo">8051</span> * Try to acquire a lock. Throw RegionTooBusyException<a name="line.8051"></a> +<span class="sourceLineNo">8052</span> * if failed to get the lock in time. Throw InterruptedIOException<a name="line.8052"></a> +<span class="sourceLineNo">8053</span> * if interrupted while waiting for the lock.<a name="line.8053"></a> +<span class="sourceLineNo">8054</span> */<a name="line.8054"></a> +<span class="sourceLineNo">8055</span> private void lock(final Lock lock, final int multiplier)<a name="line.8055"></a> +<span class="sourceLineNo">8056</span> throws RegionTooBusyException, InterruptedIOException {<a name="line.8056"></a> +<span class="sourceLineNo">8057</span> try {<a name="line.8057"></a> +<span class="sourceLineNo">8058</span> final long waitTime = Math.min(maxBusyWaitDuration,<a name="line.8058"></a> +<span class="sourceLineNo">8059</span> busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));<a name="line.8059"></a> +<span class="sourceLineNo">8060</span> if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {<a name="line.8060"></a> +<span class="sourceLineNo">8061</span> throw new RegionTooBusyException(<a name="line.8061"></a> +<span class="sourceLineNo">8062</span> "failed to get a lock in " + waitTime + " ms. " +<a name="line.8062"></a> +<span class="sourceLineNo">8063</span> "regionName=" + (this.getRegionInfo() == null ? "unknown" :<a name="line.8063"></a> +<span class="sourceLineNo">8064</span> this.getRegionInfo().getRegionNameAsString()) +<a name="line.8064"></a> +<span class="sourceLineNo">8065</span> ", server=" + (this.getRegionServerServices() == null ? "unknown" :<a name="line.8065"></a> +<span class="sourceLineNo">8066</span> this.getRegionServerServices().getServerName()));<a name="line.8066"></a> +<span class="sourceLineNo">8067</span> }<a name="line.8067"></a> +<span class="sourceLineNo">8068</span> } catch (InterruptedException ie) {<a name="line.8068"></a> +<span class="sourceLineNo">8069</span> LOG.info("Interrupted while waiting for a lock");<a name="line.8069"></a> +<span class="sourceLineNo">8070</span> InterruptedIOException iie = new InterruptedIOException();<a name="line.8070"></a> +<span class="sourceLineNo">8071</span> iie.initCause(ie);<a name="line.8071"></a> +<span class="sourceLineNo">8072</span> throw iie;<a name="line.8072"></a> +<span class="sourceLineNo">8073</span> }<a name="line.8073"></a> +<span class="sourceLineNo">8074</span> }<a name="line.8074"></a> +<span class="sourceLineNo">8075</span><a name="line.8075"></a> +<span class="sourceLineNo">8076</span> /**<a name="line.8076"></a> +<span class="sourceLineNo">8077</span> * Calls sync with the given transaction ID<a name="line.8077"></a> +<span class="sourceLineNo">8078</span> * @param txid should sync up to which transaction<a name="line.8078"></a> +<span class="sourceLineNo">8079</span> * @throws IOException If anything goes wrong with DFS<a name="line.8079"></a> +<span class="sourceLineNo">8080</span> */<a name="line.8080"></a> +<span class="sourceLineNo">8081</span> private void sync(long txid, Durability durability) throws IOException {<a name="line.8081"></a> +<span class="sourceLineNo">8082</span> if (this.getRegionInfo().isMetaRegion()) {<a name="line.8082"></a> +<span class="sourceLineNo">8083</span> this.wal.sync(txid);<a name="line.8083"></a> +<span class="sourceLineNo">8084</span> } else {<a name="line.8084"></a> +<span class="sourceLineNo">8085</span> switch(durability) {<a name="line.8085"></a> +<span class="sourceLineNo">8086</span> case USE_DEFAULT:<a name="line.8086"></a> +<span class="sourceLineNo">8087</span> // do what table defaults to<a name="line.8087"></a> +<span class="sourceLineNo">8088</span> if (shouldSyncWAL()) {<a name="line.8088"></a> +<span class="sourceLineNo">8089</span> this.wal.sync(txid);<a name="line.8089"></a> +<span class="sourceLineNo">8090</span> }<a name="line.8090"></a> +<span class="sourceLineNo">8091</span> break;<a name="line.8091"></a> +<span class="sourceLineNo">8092</span> case SKIP_WAL:<a name="line.8092"></a> +<span class="sourceLineNo">8093</span> // nothing do to<a name="line.8093"></a> +<span class="sourceLineNo">8094</span> break;<a name="line.8094"></a> +<span class="sourceLineNo">8095</span> case ASYNC_WAL:<a name="line.8095"></a> +<span class="sourceLineNo">8096</span> // nothing do to<a name="line.8096"></a> +<span class="sourceLineNo">8097</span> break;<a name="line.8097"></a> +<span class="sourceLineNo">8098</span> case SYNC_WAL:<a name="line.8098"></a> +<span class="sourceLineNo">8099</span> case FSYNC_WAL:<a name="line.8099"></a> +<span class="sourceLineNo">8100</span> // sync the WAL edit (SYNC and FSYNC treated the same for now)<a name="line.8100"></a> +<span class="sourceLineNo">8101</span> this.wal.sync(txid);<a name="line.8101"></a> +<span class="sourceLineNo">8102</span> break;<a name="line.8102"></a> +<span class="sourceLineNo">8103</span> default:<a name="line.8103"></a> +<span class="sourceLineNo">8104</span> throw new RuntimeException("Unknown durability " + durability);<a name="line.8104"></a> +<span class="sourceLineNo">8105</span> }<a name="line.8105"></a> +<span class="sourceLineNo">8106</span> }<a name="line.8106"></a> +<span class="sourceLineNo">8107</span> }<a name="line.8107"></a> +<span class="sourceLineNo">8108</span><a name="line.8108"></a> +<span class="sourceLineNo">8109</span> /**<a name="line.8109"></a> +<span class="sourceLineNo">8110</span> * Check whether we should sync the wal from the table's durability settings<a name="line.8110"></a> +<span class="sourceLineNo">8111</span> */<a name="line.8111"></a> +<span class="sourceLineNo">8112</span> private boolean shouldSyncWAL() {<a name="line.8112"></a> +<span class="sourceLineNo">8113</span> return durability.ordinal() > Durability.ASYNC_WAL.ordinal();<a name="line.8113"></a> +<span class="sourceLineNo">8114</span> }<a name="line.8114"></a> +<span class="sourceLineNo">8115</span><a name="line.8115"></a> +<span class="sourceLineNo">8116</span> /**<a name="line.8116"></a> +<span class="sourceLineNo">8117</span> * A mocked list implementation - discards all updates.<a name="line.8117"></a> +<span class="sourceLineNo">8118</span> */<a name="line.8118"></a> +<span class="sourceLineNo">8119</span> private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {<a name="line.8119"></a> +<span class="sourceLineNo">8120</span><a name="line.8120"></a> +<span class="sourceLineNo">8121</span> @Override<a name="line.8121"></a> +<span class="sourceLineNo">8122</span> public void add(int index, Cell element) {<a name="line.8122"></a> +<span class="sourceLineNo">8123</span> // do nothing<a name="line.8123"></a> +<span class="sourceLineNo">8124</span> }<a name="line.8124"></a> +<span class="sourceLineNo">8125</span><a name="line.8125"></a> +<span class="sourceLineNo">8126</span> @Override<a name="line.8126"></a> +<span class="sourceLineNo">8127</span> public boolean addAll(int index, Collection<? extends Cell> c) {<a name="line.8127"></a> +<span class="sourceLineNo">8128</span> return false; // this list is never changed as a result of an update<a name="line.8128"></a> +<span class="sourceLineNo">8129</span> }<a name="line.8129"></a> +<span class="sourceLineNo">8130</span><a name="line.8130"></a> +<span class="sourceLineNo">8131</span> @Override<a name="line.8131"></a> +<span class="sourceLineNo">8132</span> public KeyValue get(int index) {<a name="line.8132"></a> +<span class="sourceLineNo">8133</span> throw new UnsupportedOperationException();<a name="line.8133"></a> +<span class="sourceLineNo">8134</span> }<a name="line.8134"></a> +<span class="sourceLineNo">8135</span><a name="line.8135"></a> +<span class="sourceLineNo">8136</span> @Override<a name="line.8136"></a> +<span class="sourceLineNo">8137</span> public int size() {<a name="line.8137"></a> +<span class="sourceLineNo">8138</span> return 0;<a name="line.8138"></a> +<span class="sourceLineNo">8139</span> }<a name="line.8139"></a> +<span class="sourceLineNo">8140</span> };<a name="line.8140"></a> +<span class="sourceLineNo">8141</span><a name="line.8141"></a> +<span class="sourceLineNo">8142</span> @Override<a name="line.8142"></a> +<span class="sourceLineNo">8143</span> public long getOpenSeqNum() {<a name="line.8143"></a> +<span class="sourceLineNo">8144</span> return this.openSeqNum;<a name="line.8144"></a> +<span class="sourceLineNo">8145</span> }<a name="line.8145"></a> +<span class="sourceLineNo">8146</span><a name="line.8146"></a> +<span class="sourceLineNo">8147</span> @Override<a name="line.8147"></a> +<span class="sourceLineNo">8148</span> public Map<byte[], Long> getMaxStoreSeqId() {<a name="line.8148"></a> +<span class="sourceLineNo">8149</span> return this.maxSeqIdInStores;<a name="line.8149"></a> +<span class="sourceLineNo">8150</span> }<a name="line.8150"></a> +<span class="sourceLineNo">8151</span><a name="line.8151"></a> +<span class="sourceLineNo">8152</span> @Override<a name="line.8152"></a> +<span class="sourceLineNo">8153</span> public long getOldestSeqIdOfStore(byte[] familyName) {<a name="line.8153"></a> +<span class="sourceLineNo">8154</span> return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);<a name="line.8154"></a> +<span class="sourceLineNo">8155</span> }<a name="line.8155"></a> +<span class="sourceLineNo">8156</span><a name="line.8156"></a> +<span class="sourceLineNo">8157</span> @Override<a name="line.8157"></a> +<span class="sourceLineNo">8158</span> public CompactionState getCompactionState() {<a name="line.8158"></a> +<span class="sourceLineNo">8159</span> boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;<a name="line.8159"></a> +<span class="sourceLineNo">8160</span> return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)<a name="line.8160"></a> +<span class="sourceLineNo">8161</span> : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));<a name="line.8161"></a> +<span class="sourceLineNo">8162</span> }<a name="line.8162"></a> +<span class="sourceLineNo">8163</span><a name="line.8163"></a> +<span class="sourceLineNo">8164</span> public void reportCompactionRequestStart(boolean isMajor){<a name="line.8164"></a> +<span class="sourceLineNo">8165</span> (isMajor ? majorInProgress : minorInProgress).incrementAndGet();<a name="line.8165"></a> +<span class="sourceLineNo">8166</span> }<a name="line.8166"></a> +<span class="sourceLineNo">8167</span><a name="line.8167"></a> +<span class="sourceLineNo">8168</span> public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {<a name="line.8168"></a> +<span class="sourceLineNo">8169</span> int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();<a name="line.8169"></a> +<span class="sourceLineNo">8170</span><a name="line.8170"></a> +<span class="sourceLineNo">8171</span> // metrics<a name="line.8171"></a> +<span class="sourceLineNo">8172</span> compactionsFinished.incrementAndGet();<a name="line.8172"></a> +<span class="sourceLineNo">8173</span> compactionNumFilesCompacted.addAndGet(numFiles);<a name="line.8173"></a> +<span class="sourceLineNo">8174</span> compactionNumBytesCompacted.addAndGet(filesSizeCompacted);<a name="line.8174"></a> +<span class="sourceLineNo">8175</span><a name="line.8175"></a> +<span class="sourceLineNo">8176</span> assert newValue >= 0;<a name="line.8176"></a> +<span class="sourceLineNo">8177</span> }<a name="line.8177"></a> +<span class="sourceLineNo">8178</span><a name="line.8178"></a> +<span class="sourceLineNo">8179</span> public void reportCompactionRequestFailure() {<a name="line.8179"></a> +<span class="sourceLineNo">8180</span> compactionsFailed.incrementAndGet();<a name="line.8180"></a> +<span class="sourceLineNo">8181</span> }<a name="line.8181"></a> +<span class="sourceLineNo">8182</span><a name="line.8182"></a> +<span class="sourceLineNo">8183</span> public void incrementCompactionsQueuedCount() {<a name="line.8183"></a> +<span class="sourceLineNo">8184</span> compactionsQueued.incrementAndGet();<a name="line.8184"></a> <span class="sourceLineNo">8185</span> }<a name="line.8185"></a> <span class="sourceLineNo">8186</span><a name="line.8186"></a> -<span class="sourceLineNo">8187</span> /**<a name="line.8187"></a> -<span class="sourceLineNo">8188</span> * {@inheritDoc}<a name="line.8188"></a> -<span class="sourceLineNo">8189</span> */<a name="line.8189"></a> -<span class="sourceLineNo">8190</span> @Override<a name="line.8190"></a> -<span class="sourceLineNo">8191</span> public void onConfigurationChange(Configuration conf) {<a name="line.8191"></a> -<span class="sourceLineNo">8192</span> // Do nothing for now.<a name="line.8192"></a> +<span class="sourceLineNo">8187</span> public void decrementCompactionsQueuedCount() {<a name="line.8187"></a> +<span class="sourceLineNo">8188</span> compactionsQueued.decrementAndGet();<a name="line.8188"></a> +<span class="sourceLineNo">8189</span> }<a name="line.8189"></a> +<span class="sourceLineNo">8190</span><a name="line.8190"></a> +<span class="sourceLineNo">8191</span> public void incrementFlushesQueuedCount() {<a name="line.8191"></a> +<span class="sourceLineNo">8192</span> flushesQueued.incrementAndGet();<a name="line.8192"></a> <span class="sourceLineNo">8193</span> }<a name="line.8193"></a> <span class="sourceLineNo">8194</span><a name="line.8194"></a> -<span class="sourceLineNo">8195</span> /**<a name="line.8195"></a> -<span class="sourceLineNo">8196</span> * {@inheritDoc}<a name="line.8196"></a> -<span class="sourceLineNo">8197</span> */<a name="line.8197"></a> -<span class="sourceLineNo">8198</span> @Override<a name="line.8198"></a> -<span class="sourceLineNo">8199</span> public void registerChildren(ConfigurationManager manager) {<a name="line.8199"></a> -<span class="sourceLineNo">8200</span> configurationManager = Optional.of(manager);<a name="line.8200"></a> -<span class="sourceLineNo">8201</span> for (Store s : this.stores.values()) {<a name="line.8201"></a> -<span class="sourceLineNo">8202</span> configurationManager.get().registerObserver(s);<a name="line.8202"></a> -<span class="sourceLineNo">8203</span> }<a name="line.8203"></a> -<span class="sourceLineNo">8204</span> }<a name="line.8204"></a> -<span class="sourceLineNo">8205</span><a name="line.8205"></a> -<span class="sourceLineNo">8206</span> /**<a name="line.8206"></a> -<span class="sourceLineNo">8207</span> * {@inheritDoc}<a name="line.8207"></a> -<span class="sourceLineNo">8208</span> */<a name="line.8208"></a> -<span class="sourceLineNo">8209</span> @Override<a name="line.8209"></a> -<span class="sourceLineNo">8210</span> public void deregisterChildren(ConfigurationManager manager) {<a name="line.8210"></a> -<span class="sourceLineNo">8211</span> for (Store s : this.stores.values()) {<a name="line.8211"></a> -<span class="sourceLineNo">8212</span> configurationManager.get().deregisterObserver(s);<a name="line.8212"></a> -<span class="sourceLineNo">8213</span> }<a name="line.8213"></a> -<span class="sourceLineNo">8214</span> }<a name="line.8214"></a> -<span class="sourceLineNo">8215</span><a name="line.8215"></a> -<span class="sourceLineNo">8216</span> @Override<a name="line.8216"></a> -<span class="sourceLineNo">8217</span> public CellComparator getCellComparator() {<a name="line.8217"></a> -<span class="sourceLineNo">8218</span> return this.getRegionInfo().isMetaRegion() ? CellComparator.META_COMPARATOR<a name="line.8218"></a> -<span class="sourceLineNo">8219</span> : CellComparator.COMPARATOR;<a name="line.8219"></a> -<span class="sourceLineNo">8220</span> }<a name="line.8220"></a> -<span class="sourceLineNo">8221</span><a name="line.8221"></a> -<span class="sourceLineNo">8222</span> public long getMemstoreFlushSize() {<a name="line.8222"></a> -<span class="sourceLineNo">8223</span> return this.memstoreFlushSize;<a name="line.8223"></a> -<span class="sourceLineNo">8224</span> }<a name="line.8224"></a> -<span class="sourceLineNo">8225</span><a name="line.8225"></a> -<span class="sourceLineNo">8226</span> //// method for debugging tests<a name="line.8226"></a> -<span class="sourceLineNo">8227</span> void throwException(String title, String regionName) {<a name="line.8227"></a> -<span class="sourceLineNo">8228</span> StringBuffer buf = new StringBuffer();<a name="line.8228"></a> -<span class="sourceLineNo">8229</span> buf.append(title + ", ");<a name="line.8229"></a> -<span class="sourceLineNo">8230</span> buf.append(getRegionInfo().toString());<a name="line.8230"></a> -<span class="sourceLineNo">8231</span> buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " ");<a name="line.8231"></a> -<span class="sourceLineNo">8232</span> buf.append(getRegionInfo().isMetaTable() ? " meta table " : " ");<a name="line.8232"></a> -<span class="sourceLineNo">8233</span> buf.append("stores: ");<a name="line.8233"></a> -<span class="sourceLineNo">8234</span> for (Store s : getStores()) {<a name="line.8234"></a> -<span class="sourceLineNo">8235</span> buf.append(s.getColumnFamilyDescriptor().getNameAsString());<a name="line.8235"></a> -<span class="sourceLineNo">8236</span> buf.append(" size: ");<a name="line.8236"></a> -<span class="sourceLineNo">8237</span> buf.append(s.getSizeOfMemStore().getDataSize());<a name="line.8237"></a> -<span class="sourceLineNo">8238</span> buf.append(" ");<a name="line.8238"></a> -<span class="sourceLineNo">8239</span> }<a name="line.8239"></a> -<span class="sourceLineNo">8240</span> buf.append("end-of-stores");<a name="line.8240"></a> -<span class="sourceLineNo">8241</span> buf.append(", memstore size ");<a name="line.8241"></a> -<span class="sourceLineNo">8242</span> buf.append(getMemstoreSize());<a name="line.8242"></a> -<span class="sourceLineNo">8243</span> if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) {<a name="line.8243"></a> -<span class="sourceLineNo">8244</span> throw new RuntimeException(buf.toString());<a name="line.8244"></a> -<span class="sourceLineNo">8245</span> }<a name="line.8245"></a> -<span class="sourceLineNo">8246</span> }<a name="line.8246"></a> -<span class="sourceLineNo">8247</span>}<a name="line.8247"></a> +<span class="sourceLineNo">8195</span> @VisibleForTesting<a name="line.8195"></a> +<span class="sourceLineNo">8196</span> public long getReadPoint() {<a name="line.8196"></a> +<span class="sourceLineNo">8197</span> return getReadPoint(IsolationLevel.READ_COMMITTED);<a name="line.8197"></a> +<span class="sourceLineNo">8198</span> }<a name="line.8198"></a> +<span class="sourceLineNo">8199</span><a name="line.8199"></a> +<span class="sourceLineNo">8200</span> /**<a name="line.8200"></a> +<span class="sourceLineNo">8201</span> * {@inheritDoc}<a name="line.8201"></a> +<span class="sourceLineNo">8202</span> */<a name="line.8202"></a> +<span class="sourceLineNo">8203</span> @Override<a name="line.8203"></a> +<span class="sourceLineNo">8204</span> public void onConfigurationChange(Configuration conf) {<a name="line.8204"></a> +<span class="sourceLineNo">8205</span> // Do nothing for now.<a name="line.8205"></a> +<span class="sourceLineNo">8206</span> }<a name="line.8206"></a> +<span class="sourceLineNo">8207</span><a name="line.8207"></a> +<span class="sourceLineNo">8208</span> /**<a name="line.8208"></a> +<span class="sourceLineNo">8209</span> * {@inheritDoc}<a name="line.8209"></a> +<span class="sourceLineNo">8210</span> */<a name="line.8210"></a> +<span class="sourceLineNo">8211</span> @Override<a name="line.8211"></a> +<span class="sourceLineNo">8212</span> public void registerChildren(ConfigurationManager manager) {<a name="line.8212"></a> +<span class="sourceLineNo">8213</span> configurationManager = Optional.of(manager);<a name="line.8213"></a> +<span class="sourceLineNo">8214</span> for (Store s : this.stores.values()) {<a name="line.8214"></a> +<span class="sourceLineNo">8215</span> configurationManager.get().registerObserver(s);<a name="line.8215"></a> +<span class="sourceLineNo">8216</span> }<a name="line.8216"></a> +<span class="sourceLineNo">8217</span> }<a name="line.8217"></a> +<span class="sourceLineNo">8218</span><a name="line.8218"></a> +<span class="sourceLineNo">8219</span> /**<a name="line.8219"></a> +<span class="sourceLineNo">8220</span> * {@inheritDoc}<a name="line.8220"></a> +<span class="sourceLineNo">8221</span> */<a name="line.8221"></a> +<span class="sourceLineNo">8222</span> @Override<a name="line.8222"></a> +<span class="sourceLineNo">8223</span> public void deregisterChildren(ConfigurationManager manager) {<a name="line.8223"></a> +<span class="sourceLineNo">8224</span> for (Store s : this.stores.values()) {<a name="line.8224"></a> +<span class="sourceLineNo">8225</span> configurationManager.get().deregisterObserver(s);<a name="line.8225"></a> +<span class="sourceLineNo">8226</span> }<a name="line.8226"></a> +<span class="sourceLineNo">8227</span> }<a name="line.8227"></a> +<span class="sourceLineNo">8228</span><a name="line.8228"></a> +<span class="sourceLineNo">8229</span> @Override<a name="line.8229"></a> +<span class="sourceLineNo">8230</span> public CellComparator getCellComparator() {<a name="line.8230"></a> +<span class="sourceLineNo">8231</span> return this.getRegionInfo().isMetaRegion() ? CellComparator.META_COMPARATOR<a name="line.8231"></a> +<span class="sourceLineNo">8232</span> : CellComparator.COMPARATOR;<a name="line.8232"></a> +<span class="sourceLineNo">8233</span> }<a name="line.8233"></a> +<span class="sourceLineNo">8234</span><a name="line.8234"></a> +<span class="sourceLineNo">8235</span> public long getMemstoreFlushSize() {<a name="line.8235"></a> +<span class="sourceLineNo">8236</span> return this.memstoreFlushSize;<a name="line.8236"></a> +<span class="sourceLineNo">8237</span> }<a name="line.8237"></a> +<span class="sourceLineNo">8238</span><a name="line.8238"></a> +<span class="sourceLineNo">8239</span> //// method for debugging tests<a name="line.8239"></a> +<span class="sourceLineNo">8240</span> void throwException(String title, String regionName) {<a name="line.8240"></a> +<span class="sourceLineNo">8241</span> StringBuffer buf = new StringBuffer();<a name="line.8241"></a> +<span class="sourceLineNo">8242</span> buf.append(title + ", ");<a name="line.8242"></a> +<span class="sourceLineNo">8243</span> buf.append(getRegionInfo().toString());<a name="line.8243"></a> +<span class="sourceLineNo">8244</span> buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " ");<a name="line.8244"></a> +<span class="sourceLineNo">8245</span> buf.append(getRegionInfo().isMetaTable() ? " meta table " : " ");<a name="line.8245"></a> +<span class="sourceLineNo">8246</span> buf.append("stores: ");<a name="line.8246"></a> +<span class="sourceLineNo">8247</span> for (Store s : getStores()) {<a name="line.8247"></a> +<span class="sourceLineNo">8248</span> buf.append(s.getColumnFamilyDescriptor().getNameAsString());<a name="line.8248"></a> +<span class="sourceLineNo">8249</span> buf.append(" size: ");<a name="line.8249"></a> +<span class="sourceLineNo">8250</span> buf.append(s.getSizeOfMemStore().getDataSize());<a name="line.8250"></a> +<span class="sourceLineNo">8251</span> buf.append(" ");<a name="line.8251"></a> +<span class="sourceLineNo">8252</span> }<a name="line.8252"></a> +<span class="sourceLineNo">8253</span> buf.append("end-of-stores");<a name="line.8253"></a> +<span class="sourceLineNo">8254</span> buf.append(", memstore size ");<a name="line.8254"></a> +<span class="sourceLineNo">8255</span> buf.append(getMemstoreSize());<a name="line.8255"></a> +<span class="sourceLineNo">8256</span> if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) {<a name="line.8256"></a> +<span class="sourceLineNo">8257</span> throw new RuntimeException(buf.toString());<a name="line.8257"></a> +<span class="sourceLineNo">8258</span> }<a name="line.8258"></a> +<span class="sourceLineNo">8259</span> }<a name="line.8259"></a> +<span class="sourceLineNo">8260</span>}<a name="line.8260"></a>
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/99c53df1/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html index f2ed401..29c147d 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html @@ -2494,127 +2494,143 @@ <span class="sourceLineNo">2486</span> return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;<a name="line.2486"></a> <span class="sourceLineNo">2487</span> }<a name="line.2487"></a> <span class="sourceLineNo">2488</span><a name="line.2488"></a> -<span class="sourceLineNo">2489</span> @Override<a name="line.2489"></a> -<span class="sourceLineNo">2490</span> public synchronized void closeAndArchiveCompactedFiles() throws IOException {<a name="line.2490"></a> -<span class="sourceLineNo">2491</span> // ensure other threads do not attempt to archive the same files on close()<a name="line.2491"></a> -<span class="sourceLineNo">2492</span> archiveLock.lock();<a name="line.2492"></a> -<span class="sourceLineNo">2493</span> try {<a name="line.2493"></a> -<span class="sourceLineNo">2494</span> lock.readLock().lock();<a name="line.2494"></a> -<span class="sourceLineNo">2495</span> Collection<StoreFile> copyCompactedfiles = null;<a name="line.2495"></a> -<span class="sourceLineNo">2496</span> try {<a name="line.2496"></a> -<span class="sourceLineNo">2497</span> Collection<StoreFile> compactedfiles =<a name="line.2497"></a> -<span class="sourceLineNo">2498</span> this.getStoreEngine().getStoreFileManager().getCompactedfiles();<a name="line.2498"></a> -<span class="sourceLineNo">2499</span> if (compactedfiles != null && compactedfiles.size() != 0) {<a name="line.2499"></a> -<span class="sourceLineNo">2500</span> // Do a copy under read lock<a name="line.2500"></a> -<span class="sourceLineNo">2501</span> copyCompactedfiles = new ArrayList<>(compactedfiles);<a name="line.2501"></a> -<span class="sourceLineNo">2502</span> } else {<a name="line.2502"></a> -<span class="sourceLineNo">2503</span> if (LOG.isTraceEnabled()) {<a name="line.2503"></a> -<span class="sourceLineNo">2504</span> LOG.trace("No compacted files to archive");<a name="line.2504"></a> -<span class="sourceLineNo">2505</span> return;<a name="line.2505"></a> -<span class="sourceLineNo">2506</span> }<a name="line.2506"></a> -<span class="sourceLineNo">2507</span> }<a name="line.2507"></a> -<span class="sourceLineNo">2508</span> } finally {<a name="line.2508"></a> -<span class="sourceLineNo">2509</span> lock.readLock().unlock();<a name="line.2509"></a> -<span class="sourceLineNo">2510</span> }<a name="line.2510"></a> -<span class="sourceLineNo">2511</span> if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {<a name="line.2511"></a> -<span class="sourceLineNo">2512</span> removeCompactedfiles(copyCompactedfiles);<a name="line.2512"></a> -<span class="sourceLineNo">2513</span> }<a name="line.2513"></a> -<span class="sourceLineNo">2514</span> } finally {<a name="line.2514"></a> -<span class="sourceLineNo">2515</span> archiveLock.unlock();<a name="line.2515"></a> -<span class="sourceLineNo">2516</span> }<a name="line.2516"></a> -<span class="sourceLineNo">2517</span> }<a name="line.2517"></a> -<span class="sourceLineNo">2518</span><a name="line.2518"></a> -<span class="sourceLineNo">2519</span> /**<a name="line.2519"></a> -<span class="sourceLineNo">2520</span> * Archives and removes the compacted files<a name="line.2520"></a> -<span class="sourceLineNo">2521</span> * @param compactedfiles The compacted files in this store that are not active in reads<a name="line.2521"></a> -<span class="sourceLineNo">2522</span> * @throws IOException<a name="line.2522"></a> -<span class="sourceLineNo">2523</span> */<a name="line.2523"></a> -<span class="sourceLineNo">2524</span> private void removeCompactedfiles(Collection<StoreFile> compactedfiles)<a name="line.2524"></a> -<span class="sourceLineNo">2525</span> throws IOException {<a name="line.2525"></a> -<span class="sourceLineNo">2526</span> final List<StoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());<a name="line.2526"></a> -<span class="sourceLineNo">2527</span> for (final StoreFile file : compactedfiles) {<a name="line.2527"></a> -<span class="sourceLineNo">2528</span> synchronized (file) {<a name="line.2528"></a> -<span class="sourceLineNo">2529</span> try {<a name="line.2529"></a> -<span class="sourceLineNo">2530</span> StoreFileReader r = file.getReader();<a name="line.2530"></a> -<span class="sourceLineNo">2531</span> if (r == null) {<a name="line.2531"></a> -<span class="sourceLineNo">2532</span> if (LOG.isDebugEnabled()) {<a name="line.2532"></a> -<span class="sourceLineNo">2533</span> LOG.debug("The file " + file + " was closed but still not archived.");<a name="line.2533"></a> -<span class="sourceLineNo">2534</span> }<a name="line.2534"></a> -<span class="sourceLineNo">2535</span> filesToRemove.add(file);<a name="line.2535"></a> -<span class="sourceLineNo">2536</span> continue;<a name="line.2536"></a> -<span class="sourceLineNo">2537</span> }<a name="line.2537"></a> -<span class="sourceLineNo">2538</span> if (file.isCompactedAway() && !file.isReferencedInReads()) {<a name="line.2538"></a> -<span class="sourceLineNo">2539</span> // Even if deleting fails we need not bother as any new scanners won't be<a name="line.2539"></a> -<span class="sourceLineNo">2540</span> // able to use the compacted file as the status is already compactedAway<a name="line.2540"></a> -<span class="sourceLineNo">2541</span> if (LOG.isTraceEnabled()) {<a name="line.2541"></a> -<span class="sourceLineNo">2542</span> LOG.trace("Closing and archiving the file " + file.getPath());<a name="line.2542"></a> -<span class="sourceLineNo">2543</span> }<a name="line.2543"></a> -<span class="sourceLineNo">2544</span> r.close(true);<a name="line.2544"></a> -<span class="sourceLineNo">2545</span> // Just close and return<a name="line.2545"></a> -<span class="sourceLineNo">2546</span> filesToRemove.add(file);<a name="line.2546"></a> -<span class="sourceLineNo">2547</span> }<a name="line.2547"></a> -<span class="sourceLineNo">2548</span> } catch (Exception e) {<a name="line.2548"></a> -<span class="sourceLineNo">2549</span> LOG.error(<a name="line.2549"></a> -<span class="sourceLineNo">2550</span> "Exception while trying to close the compacted store file " + file.getPath().getName());<a name="line.2550"></a> -<span class="sourceLineNo">2551</span> }<a name="line.2551"></a> -<span class="sourceLineNo">2552</span> }<a name="line.2552"></a> -<span class="sourceLineNo">2553</span> }<a name="line.2553"></a> -<span class="sourceLineNo">2554</span> if (this.isPrimaryReplicaStore()) {<a name="line.2554"></a> -<span class="sourceLineNo">2555</span> // Only the primary region is allowed to move the file to archive.<a name="line.2555"></a> -<span class="sourceLineNo">2556</span> // The secondary region does not move the files to archive. Any active reads from<a name="line.2556"></a> -<span class="sourceLineNo">2557</span> // the secondary region will still work because the file as such has active readers on it.<a name="line.2557"></a> -<span class="sourceLineNo">2558</span> if (!filesToRemove.isEmpty()) {<a name="line.2558"></a> -<span class="sourceLineNo">2559</span> if (LOG.isDebugEnabled()) {<a name="line.2559"></a> -<span class="sourceLineNo">2560</span> LOG.debug("Moving the files " + filesToRemove + " to archive");<a name="line.2560"></a> -<span class="sourceLineNo">2561</span> }<a name="line.2561"></a> -<span class="sourceLineNo">2562</span> // Only if this is successful it has to be removed<a name="line.2562"></a> -<span class="sourceLineNo">2563</span> try {<a name="line.2563"></a> -<span class="sourceLineNo">2564</span> this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);<a name="line.2564"></a> -<span class="sourceLineNo">2565</span> } catch (FailedArchiveException fae) {<a name="line.2565"></a> -<span class="sourceLineNo">2566</span> // Even if archiving some files failed, we still need to clear out any of the<a name="line.2566"></a> -<span class="sourceLineNo">2567</span> // files which were successfully archived. Otherwise we will receive a<a name="line.2567"></a> -<span class="sourceLineNo">2568</span> // FileNotFoundException when we attempt to re-archive them in the next go around.<a name="line.2568"></a> -<span class="sourceLineNo">2569</span> Collection<Path> failedFiles = fae.getFailedFiles();<a name="line.2569"></a> -<span class="sourceLineNo">2570</span> Iterator<StoreFile> iter = filesToRemove.iterator();<a name="line.2570"></a> -<span class="sourceLineNo">2571</span> while (iter.hasNext()) {<a name="line.2571"></a> -<span class="sourceLineNo">2572</span> if (failedFiles.contains(iter.next().getPath())) {<a name="line.2572"></a> -<span class="sourceLineNo">2573</span> iter.remove();<a name="line.2573"></a> -<span class="sourceLineNo">2574</span> }<a name="line.2574"></a> -<span class="sourceLineNo">2575</span> }<a name="line.2575"></a> -<span class="sourceLineNo">2576</span> if (!filesToRemove.isEmpty()) {<a name="line.2576"></a> -<span class="sourceLineNo">2577</span> clearCompactedfiles(filesToRemove);<a name="line.2577"></a> -<span class="sourceLineNo">2578</span> }<a name="line.2578"></a> -<span class="sourceLineNo">2579</span> throw fae;<a name="line.2579"></a> -<span class="sourceLineNo">2580</span> }<a name="line.2580"></a> -<span class="sourceLineNo">2581</span> }<a name="line.2581"></a> -<span class="sourceLineNo">2582</span> }<a name="line.2582"></a> -<span class="sourceLineNo">2583</span> if (!filesToRemove.isEmpty()) {<a name="line.2583"></a> -<span class="sourceLineNo">2584</span> // Clear the compactedfiles from the store file manager<a name="line.2584"></a> -<span class="sourceLineNo">2585</span> clearCompactedfiles(filesToRemove);<a name="line.2585"></a> -<span class="sourceLineNo">2586</span> }<a name="line.2586"></a> -<span class="sourceLineNo">2587</span> }<a name="line.2587"></a> -<span class="sourceLineNo">2588</span><a name="line.2588"></a> -<span class="sourceLineNo">2589</span> public Long preFlushSeqIDEstimation() {<a name="line.2589"></a> -<span class="sourceLineNo">2590</span> return memstore.preFlushSeqIDEstimation();<a name="line.2590"></a> -<span class="sourceLineNo">2591</span> }<a name="line.2591"></a> -<span class="sourceLineNo">2592</span><a name="line.2592"></a> -<span class="sourceLineNo">2593</span> @Override<a name="line.2593"></a> -<span class="sourceLineNo">2594</span> public boolean isSloppyMemstore() {<a name="line.2594"></a> -<span class="sourceLineNo">2595</span> return this.memstore.isSloppy();<a name="line.2595"></a> -<span class="sourceLineNo">2596</span> }<a name="line.2596"></a> -<span class="sourceLineNo">2597</span><a name="line.2597"></a> -<span class="sourceLineNo">2598</span> private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {<a name="line.2598"></a> -<span class="sourceLineNo">2599</span> if (LOG.isTraceEnabled()) {<a name="line.2599"></a> -<span class="sourceLineNo">2600</span> LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");<a name="line.2600"></a> -<span class="sourceLineNo">2601</span> }<a name="line.2601"></a> -<span class="sourceLineNo">2602</span> try {<a name="line.2602"></a> -<span class="sourceLineNo">2603</span> lock.writeLock().lock();<a name="line.2603"></a> -<span class="sourceLineNo">2604</span> this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);<a name="line.2604"></a> -<span class="sourceLineNo">2605</span> } finally {<a name="line.2605"></a> -<span class="sourceLineNo">2606</span> lock.writeLock().unlock();<a name="line.2606"></a> -<span class="sourceLineNo">2607</span> }<a name="line.2607"></a> -<span class="sourceLineNo">2608</span> }<a name="line.2608"></a> -<span class="sourceLineNo">2609</span>}<a name="line.2609"></a> +<span class="sourceLineNo">2489</span> /**<a name="line.2489"></a> +<span class="sourceLineNo">2490</span> * Sets the store up for a region level snapshot operation.<a name="line.2490"></a> +<span class="sourceLineNo">2491</span> * @see #postSnapshotOperation()<a name="line.2491"></a> +<span class="sourceLineNo">2492</span> */<a name="line.2492"></a> +<span class="sourceLineNo">2493</span> public void preSnapshotOperation() {<a name="line.2493"></a> +<span class="sourceLineNo">2494</span> archiveLock.lock();<a name="line.2494"></a> +<span class="sourceLineNo">2495</span> }<a name="line.2495"></a> +<span class="sourceLineNo">2496</span><a name="line.2496"></a> +<span class="sourceLineNo">2497</span> /**<a name="line.2497"></a> +<span class="sourceLineNo">2498</span> * Perform tasks needed after the completion of snapshot operation.<a name="line.2498"></a> +<span class="sourceLineNo">2499</span> * @see #preSnapshotOperation()<a name="line.2499"></a> +<span class="sourceLineNo">2500</span> */<a name="line.2500"></a> +<span class="sourceLineNo">2501</span> public void postSnapshotOperation() {<a name="line.2501"></a> +<span class="sourceLineNo">2502</span> archiveLock.unlock();<a name="line.2502"></a> +<span class="sourceLineNo">2503</span> }<a name="line.2503"></a> +<span class="sourceLineNo">2504</span><a name="line.2504"></a> +<span class="sourceLineNo">2505</span> @Override<a name="line.2505"></a> +<span class="sourceLineNo">2506</span> public synchronized void closeAndArchiveCompactedFiles() throws IOException {<a name="line.2506"></a> +<span class="sourceLineNo">2507</span> // ensure other threads do not attempt to archive the same files on close()<a name="line.2507"></a> +<span class="sourceLineNo">2508</span> archiveLock.lock();<a name="line.2508"></a> +<span class="sourceLineNo">2509</span> try {<a name="line.2509"></a> +<span class="sourceLineNo">2510</span> lock.readLock().lock();<a name="line.2510"></a> +<span class="sourceLineNo">2511</span> Collection<StoreFile> copyCompactedfiles = null;<a name="line.2511"></a> +<span class="sourceLineNo">2512</span> try {<a name="line.2512"></a> +<span class="sourceLineNo">2513</span> Collection<StoreFile> compactedfiles =<a name="line.2513"></a> +<span class="sourceLineNo">2514</span> this.getStoreEngine().getStoreFileManager().getCompactedfiles();<a name="line.2514"></a> +<span class="sourceLineNo">2515</span> if (compactedfiles != null && compactedfiles.size() != 0) {<a name="line.2515"></a> +<span class="sourceLineNo">2516</span> // Do a copy under read lock<a name="line.2516"></a> +<span class="sourceLineNo">2517</span> copyCompactedfiles = new ArrayList<>(compactedfiles);<a name="line.2517"></a> +<span class="sourceLineNo">2518</span> } else {<a name="line.2518"></a> +<span class="sourceLineNo">2519</span> if (LOG.isTraceEnabled()) {<a name="line.2519"></a> +<span class="sourceLineNo">2520</span> LOG.trace("No compacted files to archive");<a name="line.2520"></a> +<span class="sourceLineNo">2521</span> return;<a name="line.2521"></a> +<span class="sourceLineNo">2522</span> }<a name="line.2522"></a> +<span class="sourceLineNo">2523</span> }<a name="line.2523"></a> +<span class="sourceLineNo">2524</span> } finally {<a name="line.2524"></a> +<span class="sourceLineNo">2525</span> lock.readLock().unlock();<a name="line.2525"></a> +<span class="sourceLineNo">2526</span> }<a name="line.2526"></a> +<span class="sourceLineNo">2527</span> if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {<a name="line.2527"></a> +<span class="sourceLineNo">2528</span> removeCompactedfiles(copyCompactedfiles);<a name="line.2528"></a> +<span class="sourceLineNo">2529</span> }<a name="line.2529"></a> +<span class="sourceLineNo">2530</span> } finally {<a name="line.2530"></a> +<span class="sourceLineNo">2531</span> archiveLock.unlock();<a name="line.2531"></a> +<span class="sourceLineNo">2532</span> }<a name="line.2532"></a> +<span class="sourceLineNo">2533</span> }<a name="line.2533"></a> +<span class="sourceLineNo">2534</span><a name="line.2534"></a> +<span class="sourceLineNo">2535</span> /**<a name="line.2535"></a> +<span class="sourceLineNo">2536</span> * Archives and removes the compacted files<a name="line.2536"></a> +<span class="sourceLineNo">2537</span> * @param compactedfiles The compacted files in this store that are not active in reads<a name="line.2537"></a> +<span class="sourceLineNo">2538</span> * @throws IOException<a name="line.2538"></a> +<span class="sourceLineNo">2539</span> */<a name="line.2539"></a> +<span class="sourceLineNo">2540</span> private void removeCompactedfiles(Collection<StoreFile> compactedfiles)<a name="line.2540"></a> +<span class="sourceLineNo">2541</span> throws IOException {<a name="line.2541"></a> +<span class="sourceLineNo">2542</span> final List<StoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());<a name="line.2542"></a> +<span class="sourceLineNo">2543</span> for (final StoreFile file : compactedfiles) {<a name="line.2543"></a> +<span class="sourceLineNo">2544</span> synchronized (file) {<a name="line.2544"></a> +<span class="sourceLineNo">2545</span> try {<a name="line.2545"></a> +<span class="sourceLineNo">2546</span> StoreFileReader r = file.getReader();<a name="line.2546"></a> +<span class="sourceLineNo">2547</span> if (r == null) {<a name="line.2547"></a> +<span class="sourceLineNo">2548</span> if (LOG.isDebugEnabled()) {<a name="line.2548"></a> +<span class="sourceLineNo">2549</span> LOG.debug("The file " + file + " was closed but still not archived.");<a name="line.2549"></a> +<span class="sourceLineNo">2550</span> }<a name="line.2550"></a> +<span class="sourceLineNo">2551</span> filesToRemove.add(file);<a name="line.2551"></a> +<span class="sourceLineNo">2552</span> continue;<a name="line.2552"></a> +<span class="sourceLineNo">2553</span> }<a name="line.2553"></a> +<span class="sourceLineNo">2554</span> if (file.isCompactedAway() && !file.isReferencedInReads()) {<a name="line.2554"></a> +<span class="sourceLineNo">2555</span> // Even if deleting fails we need not bother as any new scanners won't be<a name="line.2555"></a> +<span class="sourceLineNo">2556</span> // able to use the compacted file as the status is already compactedAway<a name="line.2556"></a> +<span class="sourceLineNo">2557</span> if (LOG.isTraceEnabled()) {<a name="line.2557"></a> +<span class="sourceLineNo">2558</span> LOG.trace("Closing and archiving the file " + file.getPath());<a name="line.2558"></a> +<span class="sourceLineNo">2559</span> }<a name="line.2559"></a> +<span class="sourceLineNo">2560</span> r.close(true);<a name="line.2560"></a> +<span class="sourceLineNo">2561</span> // Just close and return<a name="line.2561"></a> +<span class="sourceLineNo">2562</span> filesToRemove.add(file);<a name="line.2562"></a> +<span class="sourceLineNo">2563</span> }<a name="line.2563"></a> +<span class="sourceLineNo">2564</span> } catch (Exception e) {<a name="line.2564"></a> +<span class="sourceLineNo">2565</span> LOG.error(<a name="line.2565"></a> +<span class="sourceLineNo">2566</span> "Exception while trying to close the compacted store file " + file.getPath().getName());<a name="line.2566"></a> +<span class="sourceLineNo">2567</span> }<a name="line.2567"></a> +<span class="sourceLineNo">2568</span> }<a name="line.2568"></a> +<span class="sourceLineNo">2569</span> }<a name="line.2569"></a> +<span class="sourceLineNo">2570</span> if (this.isPrimaryReplicaStore()) {<a name="line.2570"></a> +<span class="sourceLineNo">2571</span> // Only the primary region is allowed to move the file to archive.<a name="line.2571"></a> +<span class="sourceLineNo">2572</span> // The secondary region does not move the files to archive. Any active reads from<a name="line.2572"></a> +<span class="sourceLineNo">2573</span> // the secondary region will still work because the file as such has active readers on it.<a name="line.2573"></a> +<span class="sourceLineNo">2574</span> if (!filesToRemove.isEmpty()) {<a name="line.2574"></a> +<span class="sourceLineNo">2575</span> if (LOG.isDebugEnabled()) {<a name="line.2575"></a> +<span class="sourceLineNo">2576</span> LOG.debug("Moving the files " + filesToRemove + " to archive");<a name="line.2576"></a> +<span class="sourceLineNo">2577</span> }<a name="line.2577"></a> +<span class="sourceLineNo">2578</span> // Only if this is successful it has to be removed<a name="line.2578"></a> +<span class="sourceLineNo">2579</span> try {<a name="line.2579"></a> +<span class="sourceLineNo">2580</span> this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);<a name="line.2580"></a> +<span class="sourceLineNo">2581</span> } catch (FailedArchiveException fae) {<a name="line.2581"></a> +<span class="sourceLineNo">2582</span> // Even if archiving some files failed, we still need to clear out any of the<a name="line.2582"></a> +<span class="sourceLineNo">2583</span> // files which were successfully archived. Otherwise we will receive a<a name="line.2583"></a> +<span class="sourceLineNo">2584</span> // FileNotFoundException when we attempt to re-archive them in the next go around.<a name="line.2584"></a> +<span class="sourceLineNo">2585</span> Collection<Path> failedFiles = fae.getFailedFiles();<a name="line.2585"></a> +<span class="sourceLineNo">2586</span> Iterator<StoreFile> iter = filesToRemove.iterator();<a name="line.2586"></a> +<span class="sourceLineNo">2587</span> while (iter.hasNext()) {<a name="line.2587"></a> +<span class="sourceLineNo">2588</span> if (failedFiles.contains(iter.next().getPath())) {<a name="line.2588"></a> +<span class="sourceLineNo">2589</span> iter.remove();<a name="line.2589"></a> +<span class="sourceLineNo">2590</span> }<a name="line.2590"></a> +<span class="sourceLineNo">2591</span> }<a name="line.2591"></a> +<span class="sourceLineNo">2592</span> if (!filesToRemove.isEmpty()) {<a name="line.2592"></a> +<span class="sourceLineNo">2593</span> clearCompactedfiles(filesToRemove);<a name="line.2593"></a> +<span class="sourceLineNo">2594</span> }<a name="line.2594"></a> +<span class="sourceLineNo">2595</span> throw fae;<a name="line.2595"></a> +<span class="sourceLineNo">2596</span> }<a name="line.2596"></a> +<span class="sourceLineNo">2597</span> }<a name="line.2597"></a> +<span class="sourceLineNo">2598</span> }<a name="line.2598"></a> +<span class="sourceLineNo">2599</span> if (!filesToRemove.isEmpty()) {<a name="line.2599"></a> +<span class="sourceLineNo">2600</span> // Clear the compactedfiles from the store file manager<a name="line.2600"></a> +<span class="sourceLineNo">2601</span> clearCompactedfiles(filesToRemove);<a name="line.2601"></a> +<span class="sourceLineNo">2602</span> }<a name="line.2602"></a> +<span class="sourceLineNo">2603</span> }<a name="line.2603"></a> +<span class="sourceLineNo">2604</span><a name="line.2604"></a> +<span class="sourceLineNo">2605</span> public Long preFlushSeqIDEstimation() {<a name="line.2605"></a> +<span class="sourceLineNo">2606</span> return memstore.preFlushSeqIDEstimation();<a name="line.2606"></a> +<span class="sourceLineNo">2607</span> }<a name="line.2607"></a> +<span class="sourceLineNo">2608</span><a name="line.2608"></a> +<span class="sourceLineNo">2609</span> @Override<a name="line.2609"></a> +<span class="sourceLineNo">2610</span> public boolean isSloppyMemstore() {<a name="line.2610"></a> +<span class="sourceLineNo">2611</span> return this.memstore.isSloppy();<a name="line.2611"></a> +<span class="sourceLineNo">2612</span> }<a name="line.2612"></a> +<span class="sourceLineNo">2613</span><a name="line.2613"></a> +<span class="sourceLineNo">2614</span> private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {<a name="line.2614"></a> +<span class="sourceLineNo">2615</span> if (LOG.isTraceEnabled()) {<a name="line.2615"></a> +<span class="sourceLineNo">2616</span> LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");<a name="line.2616"></a> +<span class="sourceLineNo">2617</span> }<a name="line.2617"></a> +<span class="sourceLineNo">2618</span> try {<a name="line.2618"></a> +<span class="sourceLineNo">2619</span> lock.writeLock().lock();<a name="line.2619"></a> +<span class="sourceLineNo">2620</span> this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);<a name="line.2620"></a> +<span class="sourceLineNo">2621</span> } finally {<a name="line.2621"></a> +<span class="sourceLineNo">2622</span> lock.writeLock().unlock();<a name="line.2622"></a> +<span class="sourceLineNo">2623</span> }<a name="line.2623"></a> +<span class="sourceLineNo">2624</span> }<a name="line.2624"></a> +<span class="sourceLineNo">2625</span>}<a name="line.2625"></a>