http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a108018f/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockContext.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockContext.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockContext.html index 5c5e0eb..7b41db2 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockContext.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockContext.html @@ -2960,5416 +2960,5447 @@ <span class="sourceLineNo">2952</span> protected final ObservedExceptionsInBatch observedExceptions;<a name="line.2952"></a> <span class="sourceLineNo">2953</span> //Durability of the batch (highest durability of all operations)<a name="line.2953"></a> <span class="sourceLineNo">2954</span> protected Durability durability;<a name="line.2954"></a> -<span class="sourceLineNo">2955</span><a name="line.2955"></a> -<span class="sourceLineNo">2956</span> public BatchOperation(final HRegion region, T[] operations) {<a name="line.2956"></a> -<span class="sourceLineNo">2957</span> this.operations = operations;<a name="line.2957"></a> -<span class="sourceLineNo">2958</span> this.retCodeDetails = new OperationStatus[operations.length];<a name="line.2958"></a> -<span class="sourceLineNo">2959</span> Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);<a name="line.2959"></a> -<span class="sourceLineNo">2960</span> this.walEditsFromCoprocessors = new WALEdit[operations.length];<a name="line.2960"></a> -<span class="sourceLineNo">2961</span> familyCellMaps = new Map[operations.length];<a name="line.2961"></a> -<span class="sourceLineNo">2962</span><a name="line.2962"></a> -<span class="sourceLineNo">2963</span> this.region = region;<a name="line.2963"></a> -<span class="sourceLineNo">2964</span> observedExceptions = new ObservedExceptionsInBatch();<a name="line.2964"></a> -<span class="sourceLineNo">2965</span> durability = Durability.USE_DEFAULT;<a name="line.2965"></a> -<span class="sourceLineNo">2966</span> }<a name="line.2966"></a> -<span class="sourceLineNo">2967</span><a name="line.2967"></a> -<span class="sourceLineNo">2968</span> /**<a name="line.2968"></a> -<span class="sourceLineNo">2969</span> * Visitor interface for batch operations<a name="line.2969"></a> -<span class="sourceLineNo">2970</span> */<a name="line.2970"></a> -<span class="sourceLineNo">2971</span> @FunctionalInterface<a name="line.2971"></a> -<span class="sourceLineNo">2972</span> public interface Visitor {<a name="line.2972"></a> -<span class="sourceLineNo">2973</span> /**<a name="line.2973"></a> -<span class="sourceLineNo">2974</span> * @param index operation index<a name="line.2974"></a> -<span class="sourceLineNo">2975</span> * @return If true continue visiting remaining entries, break otherwise<a name="line.2975"></a> -<span class="sourceLineNo">2976</span> */<a name="line.2976"></a> -<span class="sourceLineNo">2977</span> boolean visit(int index) throws IOException;<a name="line.2977"></a> -<span class="sourceLineNo">2978</span> }<a name="line.2978"></a> -<span class="sourceLineNo">2979</span><a name="line.2979"></a> -<span class="sourceLineNo">2980</span> /**<a name="line.2980"></a> -<span class="sourceLineNo">2981</span> * Helper method for visiting pending/ all batch operations<a name="line.2981"></a> -<span class="sourceLineNo">2982</span> */<a name="line.2982"></a> -<span class="sourceLineNo">2983</span> public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor)<a name="line.2983"></a> -<span class="sourceLineNo">2984</span> throws IOException {<a name="line.2984"></a> -<span class="sourceLineNo">2985</span> assert lastIndexExclusive <= this.size();<a name="line.2985"></a> -<span class="sourceLineNo">2986</span> for (int i = nextIndexToProcess; i < lastIndexExclusive; i++) {<a name="line.2986"></a> -<span class="sourceLineNo">2987</span> if (!pendingOnly || isOperationPending(i)) {<a name="line.2987"></a> -<span class="sourceLineNo">2988</span> if (!visitor.visit(i)) {<a name="line.2988"></a> -<span class="sourceLineNo">2989</span> break;<a name="line.2989"></a> -<span class="sourceLineNo">2990</span> }<a name="line.2990"></a> -<span class="sourceLineNo">2991</span> }<a name="line.2991"></a> -<span class="sourceLineNo">2992</span> }<a name="line.2992"></a> -<span class="sourceLineNo">2993</span> }<a name="line.2993"></a> -<span class="sourceLineNo">2994</span><a name="line.2994"></a> -<span class="sourceLineNo">2995</span> public abstract Mutation getMutation(int index);<a name="line.2995"></a> -<span class="sourceLineNo">2996</span> public abstract long getNonceGroup(int index);<a name="line.2996"></a> -<span class="sourceLineNo">2997</span> public abstract long getNonce(int index);<a name="line.2997"></a> -<span class="sourceLineNo">2998</span> /** This method is potentially expensive and useful mostly for non-replay CP path. */<a name="line.2998"></a> -<span class="sourceLineNo">2999</span> public abstract Mutation[] getMutationsForCoprocs();<a name="line.2999"></a> -<span class="sourceLineNo">3000</span> public abstract boolean isInReplay();<a name="line.3000"></a> -<span class="sourceLineNo">3001</span> public abstract long getOrigLogSeqNum();<a name="line.3001"></a> -<span class="sourceLineNo">3002</span> public abstract void startRegionOperation() throws IOException;<a name="line.3002"></a> -<span class="sourceLineNo">3003</span> public abstract void closeRegionOperation() throws IOException;<a name="line.3003"></a> -<span class="sourceLineNo">3004</span><a name="line.3004"></a> -<span class="sourceLineNo">3005</span> /**<a name="line.3005"></a> -<span class="sourceLineNo">3006</span> * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs<a name="line.3006"></a> -<span class="sourceLineNo">3007</span> * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on<a name="line.3007"></a> -<span class="sourceLineNo">3008</span> * entire batch and will be called from outside of class to check and prepare batch. This can<a name="line.3008"></a> -<span class="sourceLineNo">3009</span> * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a<a name="line.3009"></a> -<span class="sourceLineNo">3010</span> * 'for' loop over mutations.<a name="line.3010"></a> -<span class="sourceLineNo">3011</span> */<a name="line.3011"></a> -<span class="sourceLineNo">3012</span> public abstract void checkAndPrepare() throws IOException;<a name="line.3012"></a> -<span class="sourceLineNo">3013</span><a name="line.3013"></a> -<span class="sourceLineNo">3014</span> /**<a name="line.3014"></a> -<span class="sourceLineNo">3015</span> * Implement any Put request specific check and prepare logic here. Please refer to<a name="line.3015"></a> -<span class="sourceLineNo">3016</span> * {@link #checkAndPrepareMutation(Mutation, long)} for how its used.<a name="line.3016"></a> -<span class="sourceLineNo">3017</span> */<a name="line.3017"></a> -<span class="sourceLineNo">3018</span> protected abstract void checkAndPreparePut(final Put p) throws IOException;<a name="line.3018"></a> -<span class="sourceLineNo">3019</span><a name="line.3019"></a> -<span class="sourceLineNo">3020</span> /**<a name="line.3020"></a> -<span class="sourceLineNo">3021</span> * If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell<a name="line.3021"></a> -<span class="sourceLineNo">3022</span> * count, tags and timestamp for all cells of all operations in a mini-batch.<a name="line.3022"></a> -<span class="sourceLineNo">3023</span> */<a name="line.3023"></a> -<span class="sourceLineNo">3024</span> public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation><a name="line.3024"></a> -<span class="sourceLineNo">3025</span> miniBatchOp, long timestamp, final List<RowLock> acquiredRowLocks) throws IOException;<a name="line.3025"></a> -<span class="sourceLineNo">3026</span><a name="line.3026"></a> -<span class="sourceLineNo">3027</span> /**<a name="line.3027"></a> -<span class="sourceLineNo">3028</span> * Write mini-batch operations to MemStore<a name="line.3028"></a> -<span class="sourceLineNo">3029</span> */<a name="line.3029"></a> -<span class="sourceLineNo">3030</span> public abstract WriteEntry writeMiniBatchOperationsToMemStore(<a name="line.3030"></a> -<span class="sourceLineNo">3031</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)<a name="line.3031"></a> -<span class="sourceLineNo">3032</span> throws IOException;<a name="line.3032"></a> -<span class="sourceLineNo">3033</span><a name="line.3033"></a> -<span class="sourceLineNo">3034</span> protected void writeMiniBatchOperationsToMemStore(<a name="line.3034"></a> -<span class="sourceLineNo">3035</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)<a name="line.3035"></a> -<span class="sourceLineNo">3036</span> throws IOException {<a name="line.3036"></a> -<span class="sourceLineNo">3037</span> MemStoreSizing memStoreAccounting = new MemStoreSizing();<a name="line.3037"></a> -<span class="sourceLineNo">3038</span> visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {<a name="line.3038"></a> -<span class="sourceLineNo">3039</span> // We need to update the sequence id for following reasons.<a name="line.3039"></a> -<span class="sourceLineNo">3040</span> // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.<a name="line.3040"></a> -<span class="sourceLineNo">3041</span> // 2) If no WAL, FSWALEntry won't be used<a name="line.3041"></a> -<span class="sourceLineNo">3042</span> // we use durability of the original mutation for the mutation passed by CP.<a name="line.3042"></a> -<span class="sourceLineNo">3043</span> if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) {<a name="line.3043"></a> -<span class="sourceLineNo">3044</span> region.updateSequenceId(familyCellMaps[index].values(), writeNumber);<a name="line.3044"></a> -<span class="sourceLineNo">3045</span> }<a name="line.3045"></a> -<span class="sourceLineNo">3046</span> applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);<a name="line.3046"></a> -<span class="sourceLineNo">3047</span> return true;<a name="line.3047"></a> -<span class="sourceLineNo">3048</span> });<a name="line.3048"></a> -<span class="sourceLineNo">3049</span> // update memStore size<a name="line.3049"></a> -<span class="sourceLineNo">3050</span> region.addAndGetMemStoreSize(memStoreAccounting);<a name="line.3050"></a> -<span class="sourceLineNo">3051</span> }<a name="line.3051"></a> -<span class="sourceLineNo">3052</span><a name="line.3052"></a> -<span class="sourceLineNo">3053</span> public boolean isDone() {<a name="line.3053"></a> -<span class="sourceLineNo">3054</span> return nextIndexToProcess == operations.length;<a name="line.3054"></a> -<span class="sourceLineNo">3055</span> }<a name="line.3055"></a> -<span class="sourceLineNo">3056</span><a name="line.3056"></a> -<span class="sourceLineNo">3057</span> public int size() {<a name="line.3057"></a> -<span class="sourceLineNo">3058</span> return operations.length;<a name="line.3058"></a> -<span class="sourceLineNo">3059</span> }<a name="line.3059"></a> -<span class="sourceLineNo">3060</span><a name="line.3060"></a> -<span class="sourceLineNo">3061</span> public boolean isOperationPending(int index) {<a name="line.3061"></a> -<span class="sourceLineNo">3062</span> return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN;<a name="line.3062"></a> -<span class="sourceLineNo">3063</span> }<a name="line.3063"></a> -<span class="sourceLineNo">3064</span><a name="line.3064"></a> -<span class="sourceLineNo">3065</span> public List<UUID> getClusterIds() {<a name="line.3065"></a> -<span class="sourceLineNo">3066</span> assert size() != 0;<a name="line.3066"></a> -<span class="sourceLineNo">3067</span> return getMutation(0).getClusterIds();<a name="line.3067"></a> -<span class="sourceLineNo">3068</span> }<a name="line.3068"></a> -<span class="sourceLineNo">3069</span><a name="line.3069"></a> -<span class="sourceLineNo">3070</span> /**<a name="line.3070"></a> -<span class="sourceLineNo">3071</span> * Helper method that checks and prepares only one mutation. This can be used to implement<a name="line.3071"></a> -<span class="sourceLineNo">3072</span> * {@link #checkAndPrepare()} for entire Batch.<a name="line.3072"></a> -<span class="sourceLineNo">3073</span> * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called<a name="line.3073"></a> -<span class="sourceLineNo">3074</span> * after prePut()/ preDelete() CP hooks are run for the mutation<a name="line.3074"></a> -<span class="sourceLineNo">3075</span> */<a name="line.3075"></a> -<span class="sourceLineNo">3076</span> protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)<a name="line.3076"></a> -<span class="sourceLineNo">3077</span> throws IOException {<a name="line.3077"></a> -<span class="sourceLineNo">3078</span> region.checkRow(mutation.getRow(), "batchMutate");<a name="line.3078"></a> -<span class="sourceLineNo">3079</span> if (mutation instanceof Put) {<a name="line.3079"></a> -<span class="sourceLineNo">3080</span> // Check the families in the put. If bad, skip this one.<a name="line.3080"></a> -<span class="sourceLineNo">3081</span> checkAndPreparePut((Put) mutation);<a name="line.3081"></a> -<span class="sourceLineNo">3082</span> region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);<a name="line.3082"></a> -<span class="sourceLineNo">3083</span> } else {<a name="line.3083"></a> -<span class="sourceLineNo">3084</span> region.prepareDelete((Delete) mutation);<a name="line.3084"></a> -<span class="sourceLineNo">3085</span> }<a name="line.3085"></a> -<span class="sourceLineNo">3086</span> }<a name="line.3086"></a> -<span class="sourceLineNo">3087</span><a name="line.3087"></a> -<span class="sourceLineNo">3088</span> protected void checkAndPrepareMutation(int index, long timestamp) throws IOException {<a name="line.3088"></a> -<span class="sourceLineNo">3089</span> Mutation mutation = getMutation(index);<a name="line.3089"></a> -<span class="sourceLineNo">3090</span> try {<a name="line.3090"></a> -<span class="sourceLineNo">3091</span> this.checkAndPrepareMutation(mutation, timestamp);<a name="line.3091"></a> +<span class="sourceLineNo">2955</span> protected boolean atomic = false;<a name="line.2955"></a> +<span class="sourceLineNo">2956</span><a name="line.2956"></a> +<span class="sourceLineNo">2957</span> public BatchOperation(final HRegion region, T[] operations) {<a name="line.2957"></a> +<span class="sourceLineNo">2958</span> this.operations = operations;<a name="line.2958"></a> +<span class="sourceLineNo">2959</span> this.retCodeDetails = new OperationStatus[operations.length];<a name="line.2959"></a> +<span class="sourceLineNo">2960</span> Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);<a name="line.2960"></a> +<span class="sourceLineNo">2961</span> this.walEditsFromCoprocessors = new WALEdit[operations.length];<a name="line.2961"></a> +<span class="sourceLineNo">2962</span> familyCellMaps = new Map[operations.length];<a name="line.2962"></a> +<span class="sourceLineNo">2963</span><a name="line.2963"></a> +<span class="sourceLineNo">2964</span> this.region = region;<a name="line.2964"></a> +<span class="sourceLineNo">2965</span> observedExceptions = new ObservedExceptionsInBatch();<a name="line.2965"></a> +<span class="sourceLineNo">2966</span> durability = Durability.USE_DEFAULT;<a name="line.2966"></a> +<span class="sourceLineNo">2967</span> }<a name="line.2967"></a> +<span class="sourceLineNo">2968</span><a name="line.2968"></a> +<span class="sourceLineNo">2969</span> /**<a name="line.2969"></a> +<span class="sourceLineNo">2970</span> * Visitor interface for batch operations<a name="line.2970"></a> +<span class="sourceLineNo">2971</span> */<a name="line.2971"></a> +<span class="sourceLineNo">2972</span> @FunctionalInterface<a name="line.2972"></a> +<span class="sourceLineNo">2973</span> public interface Visitor {<a name="line.2973"></a> +<span class="sourceLineNo">2974</span> /**<a name="line.2974"></a> +<span class="sourceLineNo">2975</span> * @param index operation index<a name="line.2975"></a> +<span class="sourceLineNo">2976</span> * @return If true continue visiting remaining entries, break otherwise<a name="line.2976"></a> +<span class="sourceLineNo">2977</span> */<a name="line.2977"></a> +<span class="sourceLineNo">2978</span> boolean visit(int index) throws IOException;<a name="line.2978"></a> +<span class="sourceLineNo">2979</span> }<a name="line.2979"></a> +<span class="sourceLineNo">2980</span><a name="line.2980"></a> +<span class="sourceLineNo">2981</span> /**<a name="line.2981"></a> +<span class="sourceLineNo">2982</span> * Helper method for visiting pending/ all batch operations<a name="line.2982"></a> +<span class="sourceLineNo">2983</span> */<a name="line.2983"></a> +<span class="sourceLineNo">2984</span> public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor)<a name="line.2984"></a> +<span class="sourceLineNo">2985</span> throws IOException {<a name="line.2985"></a> +<span class="sourceLineNo">2986</span> assert lastIndexExclusive <= this.size();<a name="line.2986"></a> +<span class="sourceLineNo">2987</span> for (int i = nextIndexToProcess; i < lastIndexExclusive; i++) {<a name="line.2987"></a> +<span class="sourceLineNo">2988</span> if (!pendingOnly || isOperationPending(i)) {<a name="line.2988"></a> +<span class="sourceLineNo">2989</span> if (!visitor.visit(i)) {<a name="line.2989"></a> +<span class="sourceLineNo">2990</span> break;<a name="line.2990"></a> +<span class="sourceLineNo">2991</span> }<a name="line.2991"></a> +<span class="sourceLineNo">2992</span> }<a name="line.2992"></a> +<span class="sourceLineNo">2993</span> }<a name="line.2993"></a> +<span class="sourceLineNo">2994</span> }<a name="line.2994"></a> +<span class="sourceLineNo">2995</span><a name="line.2995"></a> +<span class="sourceLineNo">2996</span> public abstract Mutation getMutation(int index);<a name="line.2996"></a> +<span class="sourceLineNo">2997</span> public abstract long getNonceGroup(int index);<a name="line.2997"></a> +<span class="sourceLineNo">2998</span> public abstract long getNonce(int index);<a name="line.2998"></a> +<span class="sourceLineNo">2999</span> /** This method is potentially expensive and useful mostly for non-replay CP path. */<a name="line.2999"></a> +<span class="sourceLineNo">3000</span> public abstract Mutation[] getMutationsForCoprocs();<a name="line.3000"></a> +<span class="sourceLineNo">3001</span> public abstract boolean isInReplay();<a name="line.3001"></a> +<span class="sourceLineNo">3002</span> public abstract long getOrigLogSeqNum();<a name="line.3002"></a> +<span class="sourceLineNo">3003</span> public abstract void startRegionOperation() throws IOException;<a name="line.3003"></a> +<span class="sourceLineNo">3004</span> public abstract void closeRegionOperation() throws IOException;<a name="line.3004"></a> +<span class="sourceLineNo">3005</span><a name="line.3005"></a> +<span class="sourceLineNo">3006</span> /**<a name="line.3006"></a> +<span class="sourceLineNo">3007</span> * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs<a name="line.3007"></a> +<span class="sourceLineNo">3008</span> * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on<a name="line.3008"></a> +<span class="sourceLineNo">3009</span> * entire batch and will be called from outside of class to check and prepare batch. This can<a name="line.3009"></a> +<span class="sourceLineNo">3010</span> * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a<a name="line.3010"></a> +<span class="sourceLineNo">3011</span> * 'for' loop over mutations.<a name="line.3011"></a> +<span class="sourceLineNo">3012</span> */<a name="line.3012"></a> +<span class="sourceLineNo">3013</span> public abstract void checkAndPrepare() throws IOException;<a name="line.3013"></a> +<span class="sourceLineNo">3014</span><a name="line.3014"></a> +<span class="sourceLineNo">3015</span> /**<a name="line.3015"></a> +<span class="sourceLineNo">3016</span> * Implement any Put request specific check and prepare logic here. Please refer to<a name="line.3016"></a> +<span class="sourceLineNo">3017</span> * {@link #checkAndPrepareMutation(Mutation, long)} for how its used.<a name="line.3017"></a> +<span class="sourceLineNo">3018</span> */<a name="line.3018"></a> +<span class="sourceLineNo">3019</span> protected abstract void checkAndPreparePut(final Put p) throws IOException;<a name="line.3019"></a> +<span class="sourceLineNo">3020</span><a name="line.3020"></a> +<span class="sourceLineNo">3021</span> /**<a name="line.3021"></a> +<span class="sourceLineNo">3022</span> * If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell<a name="line.3022"></a> +<span class="sourceLineNo">3023</span> * count, tags and timestamp for all cells of all operations in a mini-batch.<a name="line.3023"></a> +<span class="sourceLineNo">3024</span> */<a name="line.3024"></a> +<span class="sourceLineNo">3025</span> public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation><a name="line.3025"></a> +<span class="sourceLineNo">3026</span> miniBatchOp, long timestamp, final List<RowLock> acquiredRowLocks) throws IOException;<a name="line.3026"></a> +<span class="sourceLineNo">3027</span><a name="line.3027"></a> +<span class="sourceLineNo">3028</span> /**<a name="line.3028"></a> +<span class="sourceLineNo">3029</span> * Write mini-batch operations to MemStore<a name="line.3029"></a> +<span class="sourceLineNo">3030</span> */<a name="line.3030"></a> +<span class="sourceLineNo">3031</span> public abstract WriteEntry writeMiniBatchOperationsToMemStore(<a name="line.3031"></a> +<span class="sourceLineNo">3032</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)<a name="line.3032"></a> +<span class="sourceLineNo">3033</span> throws IOException;<a name="line.3033"></a> +<span class="sourceLineNo">3034</span><a name="line.3034"></a> +<span class="sourceLineNo">3035</span> protected void writeMiniBatchOperationsToMemStore(<a name="line.3035"></a> +<span class="sourceLineNo">3036</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)<a name="line.3036"></a> +<span class="sourceLineNo">3037</span> throws IOException {<a name="line.3037"></a> +<span class="sourceLineNo">3038</span> MemStoreSizing memStoreAccounting = new MemStoreSizing();<a name="line.3038"></a> +<span class="sourceLineNo">3039</span> visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {<a name="line.3039"></a> +<span class="sourceLineNo">3040</span> // We need to update the sequence id for following reasons.<a name="line.3040"></a> +<span class="sourceLineNo">3041</span> // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.<a name="line.3041"></a> +<span class="sourceLineNo">3042</span> // 2) If no WAL, FSWALEntry won't be used<a name="line.3042"></a> +<span class="sourceLineNo">3043</span> // we use durability of the original mutation for the mutation passed by CP.<a name="line.3043"></a> +<span class="sourceLineNo">3044</span> if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) {<a name="line.3044"></a> +<span class="sourceLineNo">3045</span> region.updateSequenceId(familyCellMaps[index].values(), writeNumber);<a name="line.3045"></a> +<span class="sourceLineNo">3046</span> }<a name="line.3046"></a> +<span class="sourceLineNo">3047</span> applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);<a name="line.3047"></a> +<span class="sourceLineNo">3048</span> return true;<a name="line.3048"></a> +<span class="sourceLineNo">3049</span> });<a name="line.3049"></a> +<span class="sourceLineNo">3050</span> // update memStore size<a name="line.3050"></a> +<span class="sourceLineNo">3051</span> region.addAndGetMemStoreSize(memStoreAccounting);<a name="line.3051"></a> +<span class="sourceLineNo">3052</span> }<a name="line.3052"></a> +<span class="sourceLineNo">3053</span><a name="line.3053"></a> +<span class="sourceLineNo">3054</span> public boolean isDone() {<a name="line.3054"></a> +<span class="sourceLineNo">3055</span> return nextIndexToProcess == operations.length;<a name="line.3055"></a> +<span class="sourceLineNo">3056</span> }<a name="line.3056"></a> +<span class="sourceLineNo">3057</span><a name="line.3057"></a> +<span class="sourceLineNo">3058</span> public int size() {<a name="line.3058"></a> +<span class="sourceLineNo">3059</span> return operations.length;<a name="line.3059"></a> +<span class="sourceLineNo">3060</span> }<a name="line.3060"></a> +<span class="sourceLineNo">3061</span><a name="line.3061"></a> +<span class="sourceLineNo">3062</span> public boolean isOperationPending(int index) {<a name="line.3062"></a> +<span class="sourceLineNo">3063</span> return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN;<a name="line.3063"></a> +<span class="sourceLineNo">3064</span> }<a name="line.3064"></a> +<span class="sourceLineNo">3065</span><a name="line.3065"></a> +<span class="sourceLineNo">3066</span> public List<UUID> getClusterIds() {<a name="line.3066"></a> +<span class="sourceLineNo">3067</span> assert size() != 0;<a name="line.3067"></a> +<span class="sourceLineNo">3068</span> return getMutation(0).getClusterIds();<a name="line.3068"></a> +<span class="sourceLineNo">3069</span> }<a name="line.3069"></a> +<span class="sourceLineNo">3070</span><a name="line.3070"></a> +<span class="sourceLineNo">3071</span> boolean isAtomic() {<a name="line.3071"></a> +<span class="sourceLineNo">3072</span> return atomic;<a name="line.3072"></a> +<span class="sourceLineNo">3073</span> }<a name="line.3073"></a> +<span class="sourceLineNo">3074</span><a name="line.3074"></a> +<span class="sourceLineNo">3075</span> /**<a name="line.3075"></a> +<span class="sourceLineNo">3076</span> * Helper method that checks and prepares only one mutation. This can be used to implement<a name="line.3076"></a> +<span class="sourceLineNo">3077</span> * {@link #checkAndPrepare()} for entire Batch.<a name="line.3077"></a> +<span class="sourceLineNo">3078</span> * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called<a name="line.3078"></a> +<span class="sourceLineNo">3079</span> * after prePut()/ preDelete() CP hooks are run for the mutation<a name="line.3079"></a> +<span class="sourceLineNo">3080</span> */<a name="line.3080"></a> +<span class="sourceLineNo">3081</span> protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)<a name="line.3081"></a> +<span class="sourceLineNo">3082</span> throws IOException {<a name="line.3082"></a> +<span class="sourceLineNo">3083</span> region.checkRow(mutation.getRow(), "batchMutate");<a name="line.3083"></a> +<span class="sourceLineNo">3084</span> if (mutation instanceof Put) {<a name="line.3084"></a> +<span class="sourceLineNo">3085</span> // Check the families in the put. If bad, skip this one.<a name="line.3085"></a> +<span class="sourceLineNo">3086</span> checkAndPreparePut((Put) mutation);<a name="line.3086"></a> +<span class="sourceLineNo">3087</span> region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);<a name="line.3087"></a> +<span class="sourceLineNo">3088</span> } else {<a name="line.3088"></a> +<span class="sourceLineNo">3089</span> region.prepareDelete((Delete) mutation);<a name="line.3089"></a> +<span class="sourceLineNo">3090</span> }<a name="line.3090"></a> +<span class="sourceLineNo">3091</span> }<a name="line.3091"></a> <span class="sourceLineNo">3092</span><a name="line.3092"></a> -<span class="sourceLineNo">3093</span> // store the family map reference to allow for mutations<a name="line.3093"></a> -<span class="sourceLineNo">3094</span> familyCellMaps[index] = mutation.getFamilyCellMap();<a name="line.3094"></a> -<span class="sourceLineNo">3095</span> // store durability for the batch (highest durability of all operations in the batch)<a name="line.3095"></a> -<span class="sourceLineNo">3096</span> Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());<a name="line.3096"></a> -<span class="sourceLineNo">3097</span> if (tmpDur.ordinal() > durability.ordinal()) {<a name="line.3097"></a> -<span class="sourceLineNo">3098</span> durability = tmpDur;<a name="line.3098"></a> -<span class="sourceLineNo">3099</span> }<a name="line.3099"></a> -<span class="sourceLineNo">3100</span> } catch (NoSuchColumnFamilyException nscf) {<a name="line.3100"></a> -<span class="sourceLineNo">3101</span> final String msg = "No such column family in batch mutation. ";<a name="line.3101"></a> -<span class="sourceLineNo">3102</span> if (observedExceptions.hasSeenNoSuchFamily()) {<a name="line.3102"></a> -<span class="sourceLineNo">3103</span> LOG.warn(msg + nscf.getMessage());<a name="line.3103"></a> -<span class="sourceLineNo">3104</span> } else {<a name="line.3104"></a> -<span class="sourceLineNo">3105</span> LOG.warn(msg, nscf);<a name="line.3105"></a> -<span class="sourceLineNo">3106</span> observedExceptions.sawNoSuchFamily();<a name="line.3106"></a> -<span class="sourceLineNo">3107</span> }<a name="line.3107"></a> -<span class="sourceLineNo">3108</span> retCodeDetails[index] = new OperationStatus(<a name="line.3108"></a> -<span class="sourceLineNo">3109</span> OperationStatusCode.BAD_FAMILY, nscf.getMessage());<a name="line.3109"></a> -<span class="sourceLineNo">3110</span> } catch (FailedSanityCheckException fsce) {<a name="line.3110"></a> -<span class="sourceLineNo">3111</span> final String msg = "Batch Mutation did not pass sanity check. ";<a name="line.3111"></a> -<span class="sourceLineNo">3112</span> if (observedExceptions.hasSeenFailedSanityCheck()) {<a name="line.3112"></a> -<span class="sourceLineNo">3113</span> LOG.warn(msg + fsce.getMessage());<a name="line.3113"></a> -<span class="sourceLineNo">3114</span> } else {<a name="line.3114"></a> -<span class="sourceLineNo">3115</span> LOG.warn(msg, fsce);<a name="line.3115"></a> -<span class="sourceLineNo">3116</span> observedExceptions.sawFailedSanityCheck();<a name="line.3116"></a> +<span class="sourceLineNo">3093</span> protected void checkAndPrepareMutation(int index, long timestamp) throws IOException {<a name="line.3093"></a> +<span class="sourceLineNo">3094</span> Mutation mutation = getMutation(index);<a name="line.3094"></a> +<span class="sourceLineNo">3095</span> try {<a name="line.3095"></a> +<span class="sourceLineNo">3096</span> this.checkAndPrepareMutation(mutation, timestamp);<a name="line.3096"></a> +<span class="sourceLineNo">3097</span><a name="line.3097"></a> +<span class="sourceLineNo">3098</span> // store the family map reference to allow for mutations<a name="line.3098"></a> +<span class="sourceLineNo">3099</span> familyCellMaps[index] = mutation.getFamilyCellMap();<a name="line.3099"></a> +<span class="sourceLineNo">3100</span> // store durability for the batch (highest durability of all operations in the batch)<a name="line.3100"></a> +<span class="sourceLineNo">3101</span> Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());<a name="line.3101"></a> +<span class="sourceLineNo">3102</span> if (tmpDur.ordinal() > durability.ordinal()) {<a name="line.3102"></a> +<span class="sourceLineNo">3103</span> durability = tmpDur;<a name="line.3103"></a> +<span class="sourceLineNo">3104</span> }<a name="line.3104"></a> +<span class="sourceLineNo">3105</span> } catch (NoSuchColumnFamilyException nscfe) {<a name="line.3105"></a> +<span class="sourceLineNo">3106</span> final String msg = "No such column family in batch mutation. ";<a name="line.3106"></a> +<span class="sourceLineNo">3107</span> if (observedExceptions.hasSeenNoSuchFamily()) {<a name="line.3107"></a> +<span class="sourceLineNo">3108</span> LOG.warn(msg + nscfe.getMessage());<a name="line.3108"></a> +<span class="sourceLineNo">3109</span> } else {<a name="line.3109"></a> +<span class="sourceLineNo">3110</span> LOG.warn(msg, nscfe);<a name="line.3110"></a> +<span class="sourceLineNo">3111</span> observedExceptions.sawNoSuchFamily();<a name="line.3111"></a> +<span class="sourceLineNo">3112</span> }<a name="line.3112"></a> +<span class="sourceLineNo">3113</span> retCodeDetails[index] = new OperationStatus(<a name="line.3113"></a> +<span class="sourceLineNo">3114</span> OperationStatusCode.BAD_FAMILY, nscfe.getMessage());<a name="line.3114"></a> +<span class="sourceLineNo">3115</span> if (isAtomic()) { // fail, atomic means all or none<a name="line.3115"></a> +<span class="sourceLineNo">3116</span> throw nscfe;<a name="line.3116"></a> <span class="sourceLineNo">3117</span> }<a name="line.3117"></a> -<span class="sourceLineNo">3118</span> retCodeDetails[index] = new OperationStatus(<a name="line.3118"></a> -<span class="sourceLineNo">3119</span> OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3119"></a> -<span class="sourceLineNo">3120</span> } catch (WrongRegionException we) {<a name="line.3120"></a> -<span class="sourceLineNo">3121</span> final String msg = "Batch mutation had a row that does not belong to this region. ";<a name="line.3121"></a> -<span class="sourceLineNo">3122</span> if (observedExceptions.hasSeenWrongRegion()) {<a name="line.3122"></a> -<span class="sourceLineNo">3123</span> LOG.warn(msg + we.getMessage());<a name="line.3123"></a> -<span class="sourceLineNo">3124</span> } else {<a name="line.3124"></a> -<span class="sourceLineNo">3125</span> LOG.warn(msg, we);<a name="line.3125"></a> -<span class="sourceLineNo">3126</span> observedExceptions.sawWrongRegion();<a name="line.3126"></a> -<span class="sourceLineNo">3127</span> }<a name="line.3127"></a> -<span class="sourceLineNo">3128</span> retCodeDetails[index] = new OperationStatus(<a name="line.3128"></a> -<span class="sourceLineNo">3129</span> OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3129"></a> -<span class="sourceLineNo">3130</span> }<a name="line.3130"></a> -<span class="sourceLineNo">3131</span> }<a name="line.3131"></a> -<span class="sourceLineNo">3132</span><a name="line.3132"></a> -<span class="sourceLineNo">3133</span> /**<a name="line.3133"></a> -<span class="sourceLineNo">3134</span> * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which<a name="line.3134"></a> -<span class="sourceLineNo">3135</span> * a row lock can be acquired. All mutations with locked rows are considered to be<a name="line.3135"></a> -<span class="sourceLineNo">3136</span> * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch<a name="line.3136"></a> -<span class="sourceLineNo">3137</span> * is window over {@link BatchOperation} and contains contiguous pending operations.<a name="line.3137"></a> -<span class="sourceLineNo">3138</span> *<a name="line.3138"></a> -<span class="sourceLineNo">3139</span> * @param acquiredRowLocks keeps track of rowLocks acquired.<a name="line.3139"></a> -<span class="sourceLineNo">3140</span> */<a name="line.3140"></a> -<span class="sourceLineNo">3141</span> public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch(<a name="line.3141"></a> -<span class="sourceLineNo">3142</span> List<RowLock> acquiredRowLocks) throws IOException {<a name="line.3142"></a> -<span class="sourceLineNo">3143</span> int readyToWriteCount = 0;<a name="line.3143"></a> -<span class="sourceLineNo">3144</span> int lastIndexExclusive = 0;<a name="line.3144"></a> -<span class="sourceLineNo">3145</span> for (; lastIndexExclusive < size(); lastIndexExclusive++) {<a name="line.3145"></a> -<span class="sourceLineNo">3146</span> if (!isOperationPending(lastIndexExclusive)) {<a name="line.3146"></a> -<span class="sourceLineNo">3147</span> continue;<a name="line.3147"></a> -<span class="sourceLineNo">3148</span> }<a name="line.3148"></a> -<span class="sourceLineNo">3149</span> Mutation mutation = getMutation(lastIndexExclusive);<a name="line.3149"></a> -<span class="sourceLineNo">3150</span> // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.3150"></a> -<span class="sourceLineNo">3151</span> RowLock rowLock = null;<a name="line.3151"></a> -<span class="sourceLineNo">3152</span> try {<a name="line.3152"></a> -<span class="sourceLineNo">3153</span> rowLock = region.getRowLockInternal(mutation.getRow(), true);<a name="line.3153"></a> -<span class="sourceLineNo">3154</span> } catch (TimeoutIOException e) {<a name="line.3154"></a> -<span class="sourceLineNo">3155</span> // We will retry when other exceptions, but we should stop if we timeout .<a name="line.3155"></a> -<span class="sourceLineNo">3156</span> throw e;<a name="line.3156"></a> -<span class="sourceLineNo">3157</span> } catch (IOException ioe) {<a name="line.3157"></a> -<span class="sourceLineNo">3158</span> LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3158"></a> -<span class="sourceLineNo">3159</span> }<a name="line.3159"></a> -<span class="sourceLineNo">3160</span> if (rowLock == null) {<a name="line.3160"></a> -<span class="sourceLineNo">3161</span> // We failed to grab another lock<a name="line.3161"></a> -<span class="sourceLineNo">3162</span> break; // Stop acquiring more rows for this batch<a name="line.3162"></a> -<span class="sourceLineNo">3163</span> } else {<a name="line.3163"></a> -<span class="sourceLineNo">3164</span> acquiredRowLocks.add(rowLock);<a name="line.3164"></a> -<span class="sourceLineNo">3165</span> }<a name="line.3165"></a> -<span class="sourceLineNo">3166</span> readyToWriteCount++;<a name="line.3166"></a> -<span class="sourceLineNo">3167</span> }<a name="line.3167"></a> -<span class="sourceLineNo">3168</span> return createMiniBatch(lastIndexExclusive, readyToWriteCount);<a name="line.3168"></a> -<span class="sourceLineNo">3169</span> }<a name="line.3169"></a> -<span class="sourceLineNo">3170</span><a name="line.3170"></a> -<span class="sourceLineNo">3171</span> protected MiniBatchOperationInProgress<Mutation> createMiniBatch(final int lastIndexExclusive,<a name="line.3171"></a> -<span class="sourceLineNo">3172</span> final int readyToWriteCount) {<a name="line.3172"></a> -<span class="sourceLineNo">3173</span> return new MiniBatchOperationInProgress<>(getMutationsForCoprocs(), retCodeDetails,<a name="line.3173"></a> -<span class="sourceLineNo">3174</span> walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);<a name="line.3174"></a> -<span class="sourceLineNo">3175</span> }<a name="line.3175"></a> -<span class="sourceLineNo">3176</span><a name="line.3176"></a> -<span class="sourceLineNo">3177</span> /**<a name="line.3177"></a> -<span class="sourceLineNo">3178</span> * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are<a name="line.3178"></a> -<span class="sourceLineNo">3179</span> * present, they are merged to result WALEdit.<a name="line.3179"></a> -<span class="sourceLineNo">3180</span> */<a name="line.3180"></a> -<span class="sourceLineNo">3181</span> public List<Pair<NonceKey, WALEdit>> buildWALEdits(<a name="line.3181"></a> -<span class="sourceLineNo">3182</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {<a name="line.3182"></a> -<span class="sourceLineNo">3183</span> List<Pair<NonceKey, WALEdit>> walEdits = new ArrayList<>();<a name="line.3183"></a> -<span class="sourceLineNo">3184</span><a name="line.3184"></a> -<span class="sourceLineNo">3185</span> visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() {<a name="line.3185"></a> -<span class="sourceLineNo">3186</span> private Pair<NonceKey, WALEdit> curWALEditForNonce;<a name="line.3186"></a> -<span class="sourceLineNo">3187</span> @Override<a name="line.3187"></a> -<span class="sourceLineNo">3188</span> public boolean visit(int index) throws IOException {<a name="line.3188"></a> -<span class="sourceLineNo">3189</span> Mutation m = getMutation(index);<a name="line.3189"></a> -<span class="sourceLineNo">3190</span> // we use durability of the original mutation for the mutation passed by CP.<a name="line.3190"></a> -<span class="sourceLineNo">3191</span> if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {<a name="line.3191"></a> -<span class="sourceLineNo">3192</span> region.recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3192"></a> -<span class="sourceLineNo">3193</span> return true;<a name="line.3193"></a> -<span class="sourceLineNo">3194</span> }<a name="line.3194"></a> -<span class="sourceLineNo">3195</span><a name="line.3195"></a> -<span class="sourceLineNo">3196</span> // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each.<a name="line.3196"></a> -<span class="sourceLineNo">3197</span> // Given how nonce keys are originally written, these should be contiguous.<a name="line.3197"></a> -<span class="sourceLineNo">3198</span> // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3198"></a> -<span class="sourceLineNo">3199</span> long nonceGroup = getNonceGroup(index);<a name="line.3199"></a> -<span class="sourceLineNo">3200</span> long nonce = getNonce(index);<a name="line.3200"></a> -<span class="sourceLineNo">3201</span> if (curWALEditForNonce == null ||<a name="line.3201"></a> -<span class="sourceLineNo">3202</span> curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup ||<a name="line.3202"></a> -<span class="sourceLineNo">3203</span> curWALEditForNonce.getFirst().getNonce() != nonce) {<a name="line.3203"></a> -<span class="sourceLineNo">3204</span> curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),<a name="line.3204"></a> -<span class="sourceLineNo">3205</span> new WALEdit(miniBatchOp.getCellCount(), isInReplay()));<a name="line.3205"></a> -<span class="sourceLineNo">3206</span> walEdits.add(curWALEditForNonce);<a name="line.3206"></a> -<span class="sourceLineNo">3207</span> }<a name="line.3207"></a> -<span class="sourceLineNo">3208</span> WALEdit walEdit = curWALEditForNonce.getSecond();<a name="line.3208"></a> -<span class="sourceLineNo">3209</span><a name="line.3209"></a> -<span class="sourceLineNo">3210</span> // Add WAL edits by CP<a name="line.3210"></a> -<span class="sourceLineNo">3211</span> WALEdit fromCP = walEditsFromCoprocessors[index];<a name="line.3211"></a> -<span class="sourceLineNo">3212</span> if (fromCP != null) {<a name="line.3212"></a> -<span class="sourceLineNo">3213</span> for (Cell cell : fromCP.getCells()) {<a name="line.3213"></a> -<span class="sourceLineNo">3214</span> walEdit.add(cell);<a name="line.3214"></a> -<span class="sourceLineNo">3215</span> }<a name="line.3215"></a> -<span class="sourceLineNo">3216</span> }<a name="line.3216"></a> -<span class="sourceLineNo">3217</span> addFamilyMapToWALEdit(familyCellMaps[index], walEdit);<a name="line.3217"></a> -<span class="sourceLineNo">3218</span><a name="line.3218"></a> -<span class="sourceLineNo">3219</span> return true;<a name="line.3219"></a> -<span class="sourceLineNo">3220</span> }<a name="line.3220"></a> -<span class="sourceLineNo">3221</span> });<a name="line.3221"></a> -<span class="sourceLineNo">3222</span> return walEdits;<a name="line.3222"></a> -<span class="sourceLineNo">3223</span> }<a name="line.3223"></a> -<span class="sourceLineNo">3224</span><a name="line.3224"></a> -<span class="sourceLineNo">3225</span> /**<a name="line.3225"></a> -<span class="sourceLineNo">3226</span> * This method completes mini-batch operations by calling postBatchMutate() CP hook (if<a name="line.3226"></a> -<span class="sourceLineNo">3227</span> * required) and completing mvcc.<a name="line.3227"></a> -<span class="sourceLineNo">3228</span> */<a name="line.3228"></a> -<span class="sourceLineNo">3229</span> public void completeMiniBatchOperations(<a name="line.3229"></a> -<span class="sourceLineNo">3230</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)<a name="line.3230"></a> -<span class="sourceLineNo">3231</span> throws IOException {<a name="line.3231"></a> -<span class="sourceLineNo">3232</span> if (writeEntry != null) {<a name="line.3232"></a> -<span class="sourceLineNo">3233</span> region.mvcc.completeAndWait(writeEntry);<a name="line.3233"></a> -<span class="sourceLineNo">3234</span> }<a name="line.3234"></a> -<span class="sourceLineNo">3235</span> }<a name="line.3235"></a> -<span class="sourceLineNo">3236</span><a name="line.3236"></a> -<span class="sourceLineNo">3237</span> public void doPostOpCleanupForMiniBatch(<a name="line.3237"></a> -<span class="sourceLineNo">3238</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,<a name="line.3238"></a> -<span class="sourceLineNo">3239</span> boolean success) throws IOException {}<a name="line.3239"></a> -<span class="sourceLineNo">3240</span><a name="line.3240"></a> -<span class="sourceLineNo">3241</span> /**<a name="line.3241"></a> -<span class="sourceLineNo">3242</span> * Atomically apply the given map of family->edits to the memstore.<a name="line.3242"></a> -<span class="sourceLineNo">3243</span> * This handles the consistency control on its own, but the caller<a name="line.3243"></a> -<span class="sourceLineNo">3244</span> * should already have locked updatesLock.readLock(). This also does<a name="line.3244"></a> -<span class="sourceLineNo">3245</span> * <b>not</b> check the families for validity.<a name="line.3245"></a> -<span class="sourceLineNo">3246</span> *<a name="line.3246"></a> -<span class="sourceLineNo">3247</span> * @param familyMap Map of Cells by family<a name="line.3247"></a> -<span class="sourceLineNo">3248</span> */<a name="line.3248"></a> -<span class="sourceLineNo">3249</span> protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,<a name="line.3249"></a> -<span class="sourceLineNo">3250</span> MemStoreSizing memstoreAccounting) throws IOException {<a name="line.3250"></a> -<span class="sourceLineNo">3251</span> for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {<a name="line.3251"></a> -<span class="sourceLineNo">3252</span> byte[] family = e.getKey();<a name="line.3252"></a> -<span class="sourceLineNo">3253</span> List<Cell> cells = e.getValue();<a name="line.3253"></a> -<span class="sourceLineNo">3254</span> assert cells instanceof RandomAccess;<a name="line.3254"></a> -<span class="sourceLineNo">3255</span> region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);<a name="line.3255"></a> -<span class="sourceLineNo">3256</span> }<a name="line.3256"></a> -<span class="sourceLineNo">3257</span> }<a name="line.3257"></a> -<span class="sourceLineNo">3258</span><a name="line.3258"></a> -<span class="sourceLineNo">3259</span> /**<a name="line.3259"></a> -<span class="sourceLineNo">3260</span> * Append the given map of family->edits to a WALEdit data structure.<a name="line.3260"></a> -<span class="sourceLineNo">3261</span> * This does not write to the WAL itself.<a name="line.3261"></a> -<span class="sourceLineNo">3262</span> * @param familyMap map of family->edits<a name="line.3262"></a> -<span class="sourceLineNo">3263</span> * @param walEdit the destination entry to append into<a name="line.3263"></a> -<span class="sourceLineNo">3264</span> */<a name="line.3264"></a> -<span class="sourceLineNo">3265</span> private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,<a name="line.3265"></a> -<span class="sourceLineNo">3266</span> WALEdit walEdit) {<a name="line.3266"></a> -<span class="sourceLineNo">3267</span> for (List<Cell> edits : familyMap.values()) {<a name="line.3267"></a> -<span class="sourceLineNo">3268</span> assert edits instanceof RandomAccess;<a name="line.3268"></a> -<span class="sourceLineNo">3269</span> int listSize = edits.size();<a name="line.3269"></a> -<span class="sourceLineNo">3270</span> for (int i=0; i < listSize; i++) {<a name="line.3270"></a> -<span class="sourceLineNo">3271</span> Cell cell = edits.get(i);<a name="line.3271"></a> -<span class="sourceLineNo">3272</span> walEdit.add(cell);<a name="line.3272"></a> -<span class="sourceLineNo">3273</span> }<a name="line.3273"></a> -<span class="sourceLineNo">3274</span> }<a name="line.3274"></a> -<span class="sourceLineNo">3275</span> }<a name="line.3275"></a> -<span class="sourceLineNo">3276</span> }<a name="line.3276"></a> -<span class="sourceLineNo">3277</span><a name="line.3277"></a> -<span class="sourceLineNo">3278</span> /**<a name="line.3278"></a> -<span class="sourceLineNo">3279</span> * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most<a name="line.3279"></a> -<span class="sourceLineNo">3280</span> * of the logic is same.<a name="line.3280"></a> -<span class="sourceLineNo">3281</span> */<a name="line.3281"></a> -<span class="sourceLineNo">3282</span> private static class MutationBatchOperation extends BatchOperation<Mutation> {<a name="line.3282"></a> -<span class="sourceLineNo">3283</span> private long nonceGroup;<a name="line.3283"></a> -<span class="sourceLineNo">3284</span> private long nonce;<a name="line.3284"></a> -<span class="sourceLineNo">3285</span> public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup,<a name="line.3285"></a> -<span class="sourceLineNo">3286</span> long nonce) {<a name="line.3286"></a> -<span class="sourceLineNo">3287</span> super(region, operations);<a name="line.3287"></a> -<span class="sourceLineNo">3288</span> this.nonceGroup = nonceGroup;<a name="line.3288"></a> -<span class="sourceLineNo">3289</span> this.nonce = nonce;<a name="line.3289"></a> -<span class="sourceLineNo">3290</span> }<a name="line.3290"></a> -<span class="sourceLineNo">3291</span><a name="line.3291"></a> -<span class="sourceLineNo">3292</span> @Override<a name="line.3292"></a> -<span class="sourceLineNo">3293</span> public Mutation getMutation(int index) {<a name="line.3293"></a> -<span class="sourceLineNo">3294</span> return this.operations[index];<a name="line.3294"></a> -<span class="sourceLineNo">3295</span> }<a name="line.3295"></a> -<span class="sourceLineNo">3296</span><a name="line.3296"></a> -<span class="sourceLineNo">3297</span> @Override<a name="line.3297"></a> -<span class="sourceLineNo">3298</span> public long getNonceGroup(int index) {<a name="line.3298"></a> -<span class="sourceLineNo">3299</span> return nonceGroup;<a name="line.3299"></a> -<span class="sourceLineNo">3300</span> }<a name="line.3300"></a> -<span class="sourceLineNo">3301</span><a name="line.3301"></a> -<span class="sourceLineNo">3302</span> @Override<a name="line.3302"></a> -<span class="sourceLineNo">3303</span> public long getNonce(int index) {<a name="line.3303"></a> -<span class="sourceLineNo">3304</span> return nonce;<a name="line.3304"></a> -<span class="sourceLineNo">3305</span> }<a name="line.3305"></a> -<span class="sourceLineNo">3306</span><a name="line.3306"></a> -<span class="sourceLineNo">3307</span> @Override<a name="line.3307"></a> -<span class="sourceLineNo">3308</span> public Mutation[] getMutationsForCoprocs() {<a name="line.3308"></a> -<span class="sourceLineNo">3309</span> return this.operations;<a name="line.3309"></a> -<span class="sourceLineNo">3310</span> }<a name="line.3310"></a> -<span class="sourceLineNo">3311</span><a name="line.3311"></a> -<span class="sourceLineNo">3312</span> @Override<a name="line.3312"></a> -<span class="sourceLineNo">3313</span> public boolean isInReplay() {<a name="line.3313"></a> -<span class="sourceLineNo">3314</span> return false;<a name="line.3314"></a> -<span class="sourceLineNo">3315</span> }<a name="line.3315"></a> -<span class="sourceLineNo">3316</span><a name="line.3316"></a> -<span class="sourceLineNo">3317</span> @Override<a name="line.3317"></a> -<span class="sourceLineNo">3318</span> public long getOrigLogSeqNum() {<a name="line.3318"></a> -<span class="sourceLineNo">3319</span> return WALKey.NO_SEQUENCE_ID;<a name="line.3319"></a> -<span class="sourceLineNo">3320</span> }<a name="line.3320"></a> -<span class="sourceLineNo">3321</span><a name="line.3321"></a> -<span class="sourceLineNo">3322</span> @Override<a name="line.3322"></a> -<span class="sourceLineNo">3323</span> public void startRegionOperation() throws IOException {<a name="line.3323"></a> -<span class="sourceLineNo">3324</span> region.startRegionOperation(Operation.BATCH_MUTATE);<a name="line.3324"></a> -<span class="sourceLineNo">3325</span> }<a name="line.3325"></a> -<span class="sourceLineNo">3326</span><a name="line.3326"></a> -<span class="sourceLineNo">3327</span> @Override<a name="line.3327"></a> -<span class="sourceLineNo">3328</span> public void closeRegionOperation() throws IOException {<a name="line.3328"></a> -<span class="sourceLineNo">3329</span> region.closeRegionOperation(Operation.BATCH_MUTATE);<a name="line.3329"></a> -<span class="sourceLineNo">3330</span> }<a name="line.3330"></a> -<span class="sourceLineNo">3331</span><a name="line.3331"></a> -<span class="sourceLineNo">3332</span> @Override<a name="line.3332"></a> -<span class="sourceLineNo">3333</span> public void checkAndPreparePut(Put p) throws IOException {<a name="line.3333"></a> -<span class="sourceLineNo">3334</span> region.checkFamilies(p.getFamilyCellMap().keySet());<a name="line.3334"></a> -<span class="sourceLineNo">3335</span> }<a name="line.3335"></a> -<span class="sourceLineNo">3336</span><a name="line.3336"></a> -<span class="sourceLineNo">3337</span> @Override<a name="line.3337"></a> -<span class="sourceLineNo">3338</span> public void checkAndPrepare() throws IOException {<a name="line.3338"></a> -<span class="sourceLineNo">3339</span> final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes<a name="line.3339"></a> -<span class="sourceLineNo">3340</span> visitBatchOperations(true, this.size(), new Visitor() {<a name="line.3340"></a> -<span class="sourceLineNo">3341</span> private long now = EnvironmentEdgeManager.currentTime();<a name="line.3341"></a> -<span class="sourceLineNo">3342</span> private WALEdit walEdit;<a name="line.3342"></a> -<span class="sourceLineNo">3343</span> @Override<a name="line.3343"></a> -<span class="sourceLineNo">3344</span> public boolean visit(int index) throws IOException {<a name="line.3344"></a> -<span class="sourceLineNo">3345</span> // Run coprocessor pre hook outside of locks to avoid deadlock<a name="line.3345"></a> -<span class="sourceLineNo">3346</span> if (region.coprocessorHost != null) {<a name="line.3346"></a> -<span class="sourceLineNo">3347</span> if (walEdit == null) {<a name="line.3347"></a> -<span class="sourceLineNo">3348</span> walEdit = new WALEdit();<a name="line.3348"></a> -<span class="sourceLineNo">3349</span> }<a name="line.3349"></a> -<span class="sourceLineNo">3350</span> callPreMutateCPHook(index, walEdit, metrics);<a name="line.3350"></a> -<span class="sourceLineNo">3351</span> if (!walEdit.isEmpty()) {<a name="line.3351"></a> -<span class="sourceLineNo">3352</span> walEditsFromCoprocessors[index] = walEdit;<a name="line.3352"></a> -<span class="sourceLineNo">3353</span> walEdit = null;<a name="line.3353"></a> -<span class="sourceLineNo">3354</span> }<a name="line.3354"></a> -<span class="sourceLineNo">3355</span> }<a name="line.3355"></a> -<span class="sourceLineNo">3356</span> if (isOperationPending(index)) {<a name="line.3356"></a> -<span class="sourceLineNo">3357</span> // TODO: Currently validation is done with current time before acquiring locks and<a name="line.3357"></a> -<span class="sourceLineNo">3358</span> // updates are done with different timestamps after acquiring locks. This behavior is<a name="line.3358"></a> -<span class="sourceLineNo">3359</span> // inherited from the code prior to this change. Can this be changed?<a name="line.3359"></a> -<span class="sourceLineNo">3360</span> checkAndPrepareMutation(index, now);<a name="line.3360"></a> -<span class="sourceLineNo">3361</span> }<a name="line.3361"></a> -<span class="sourceLineNo">3362</span> return true;<a name="line.3362"></a> -<span class="sourceLineNo">3363</span> }<a name="line.3363"></a> -<span class="sourceLineNo">3364</span> });<a name="line.3364"></a> -<span class="sourceLineNo">3365</span><a name="line.3365"></a> -<span class="sourceLineNo">3366</span> // FIXME: we may update metrics twice! here for all operations bypassed by CP and later in<a name="line.3366"></a> -<span class="sourceLineNo">3367</span> // normal processing.<a name="line.3367"></a> -<span class="sourceLineNo">3368</span> // Update metrics in same way as it is done when we go the normal processing route (we now<a name="line.3368"></a> -<span class="sourceLineNo">3369</span> // update general metrics though a Coprocessor did the work).<a name="line.3369"></a> -<span class="sourceLineNo">3370</span> if (region.metricsRegion != null) {<a name="line.3370"></a> -<span class="sourceLineNo">3371</span> if (metrics[0] > 0) {<a name="line.3371"></a> -<span class="sourceLineNo">3372</span> // There were some Puts in the batch.<a name="line.3372"></a> -<span class="sourceLineNo">3373</span> region.metricsRegion.updatePut();<a name="line.3373"></a> -<span class="sourceLineNo">3374</span> }<a name="line.3374"></a> -<span class="sourceLineNo">3375</span> if (metrics[1] > 0) {<a name="line.3375"></a> -<span class="sourceLineNo">3376</span> // There were some Deletes in the batch.<a name="line.3376"></a> -<span class="sourceLineNo">3377</span> region.metricsRegion.updateDelete();<a name="line.3377"></a> -<span class="sourceLineNo">3378</span> }<a name="line.3378"></a> -<span class="sourceLineNo">3379</span> }<a name="line.3379"></a> -<span class="sourceLineNo">3380</span> }<a name="line.3380"></a> -<span class="sourceLineNo">3381</span><a name="line.3381"></a> -<span class="sourceLineNo">3382</span> @Override<a name="line.3382"></a> -<span class="sourceLineNo">3383</span> public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,<a name="line.3383"></a> -<span class="sourceLineNo">3384</span> long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {<a name="line.3384"></a> -<span class="sourceLineNo">3385</span> byte[] byteTS = Bytes.toBytes(timestamp);<a name="line.3385"></a> -<span class="sourceLineNo">3386</span> visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {<a name="line.3386"></a> -<span class="sourceLineNo">3387</span> Mutation mutation = getMutation(index);<a name="line.3387"></a> -<span class="sourceLineNo">3388</span> if (mutation instanceof Put) {<a name="line.3388"></a> -<span class="sourceLineNo">3389</span> region.updateCellTimestamps(familyCellMaps[index].values(), byteTS);<a name="line.3389"></a> -<span class="sourceLineNo">3390</span> miniBatchOp.incrementNumOfPuts();<a name="line.3390"></a> -<span class="sourceLineNo">3391</span> } else {<a name="line.3391"></a> -<span class="sourceLineNo">3392</span> region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS);<a name="line.3392"></a> -<span class="sourceLineNo">3393</span> miniBatchOp.incrementNumOfDeletes();<a name="line.3393"></a> -<span class="sourceLineNo">3394</span> }<a name="line.3394"></a> -<span class="sourceLineNo">3395</span> region.rewriteCellTags(familyCellMaps[index], mutation);<a name="line.3395"></a> -<span class="sourceLineNo">3396</span><a name="line.3396"></a> -<span class="sourceLineNo">3397</span> // update cell count<a name="line.3397"></a> -<span class="sourceLineNo">3398</span> if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {<a name="line.3398"></a> -<span class="sourceLineNo">3399</span> for (List<Cell> cells : mutation.getFamilyCellMap().values()) {<a name="line.3399"></a> -<span class="sourceLineNo">3400</span> miniBatchOp.addCellCount(cells.size());<a name="line.3400"></a> -<span class="sourceLineNo">3401</span> }<a name="line.3401"></a> -<span class="sourceLineNo">3402</span> }<a name="line.3402"></a> +<span class="sourceLineNo">3118</span> } catch (FailedSanityCheckException fsce) {<a name="line.3118"></a> +<span class="sourceLineNo">3119</span> final String msg = "Batch Mutation did not pass sanity check. ";<a name="line.3119"></a> +<span class="sourceLineNo">3120</span> if (observedExceptions.hasSeenFailedSanityCheck()) {<a name="line.3120"></a> +<span class="sourceLineNo">3121</span> LOG.warn(msg + fsce.getMessage());<a name="line.3121"></a> +<span class="sourceLineNo">3122</span> } else {<a name="line.3122"></a> +<span class="sourceLineNo">3123</span> LOG.warn(msg, fsce);<a name="line.3123"></a> +<span class="sourceLineNo">3124</span> observedExceptions.sawFailedSanityCheck();<a name="line.3124"></a> +<span class="sourceLineNo">3125</span> }<a name="line.3125"></a> +<span class="sourceLineNo">3126</span> retCodeDetails[index] = new OperationStatus(<a name="line.3126"></a> +<span class="sourceLineNo">3127</span> OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3127"></a> +<span class="sourceLineNo">3128</span> if (isAtomic()) {<a name="line.3128"></a> +<span class="sourceLineNo">3129</span> throw fsce;<a name="line.3129"></a> +<span class="sourceLineNo">3130</span> }<a name="line.3130"></a> +<span class="sourceLineNo">3131</span> } catch (WrongRegionException we) {<a name="line.3131"></a> +<span class="sourceLineNo">3132</span> final String msg = "Batch mutation had a row that does not belong to this region. ";<a name="line.3132"></a> +<span class="sourceLineNo">3133</span> if (observedExceptions.hasSeenWrongRegion()) {<a name="line.3133"></a> +<span class="sourceLineNo">3134</span> LOG.warn(msg + we.getMessage());<a name="line.3134"></a> +<span class="sourceLineNo">3135</span> } else {<a name="line.3135"></a> +<span class="sourceLineNo">3136</span> LOG.warn(msg, we);<a name="line.3136"></a> +<span class="sourceLineNo">3137</span> observedExceptions.sawWrongRegion();<a name="line.3137"></a> +<span class="sourceLineNo">3138</span> }<a name="line.3138"></a> +<span class="sourceLineNo">3139</span> retCodeDetails[index] = new OperationStatus(<a name="line.3139"></a> +<span class="sourceLineNo">3140</span> OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3140"></a> +<span class="sourceLineNo">3141</span> if (isAtomic()) {<a name="line.3141"></a> +<span class="sourceLineNo">3142</span> throw we;<a name="line.3142"></a> +<span class="sourceLineNo">3143</span> }<a name="line.3143"></a> +<span class="sourceLineNo">3144</span> }<a name="line.3144"></a> +<span class="sourceLineNo">3145</span> }<a name="line.3145"></a> +<span class="sourceLineNo">3146</span><a name="line.3146"></a> +<span class="sourceLineNo">3147</span> /**<a name="line.3147"></a> +<span class="sourceLineNo">3148</span> * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which<a name="line.3148"></a> +<span class="sourceLineNo">3149</span> * a row lock can be acquired. All mutations with locked rows are considered to be<a name="line.3149"></a> +<span class="sourceLineNo">3150</span> * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch<a name="line.3150"></a> +<span class="sourceLineNo">3151</span> * is window over {@link BatchOperation} and contains contiguous pending operations.<a name="line.3151"></a> +<span class="sourceLineNo">3152</span> *<a name="line.3152"></a> +<span class="sourceLineNo">3153</span> * @param acquiredRowLocks keeps track of rowLocks acquired.<a name="line.3153"></a> +<span class="sourceLineNo">3154</span> */<a name="line.3154"></a> +<span class="sourceLineNo">3155</span> public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch(<a name="line.3155"></a> +<span class="sourceLineNo">3156</span> List<RowLock> acquiredRowLocks) throws IOException {<a name="line.3156"></a> +<span class="sourceLineNo">3157</span> int readyToWriteCount = 0;<a name="line.3157"></a> +<span class="sourceLineNo">3158</span> int lastIndexExclusive = 0;<a name="line.3158"></a> +<span class="sourceLineNo">3159</span> for (; lastIndexExclusive < size(); lastIndexExclusive++) {<a name="line.3159"></a> +<span class="sourceLineNo">3160</span> if (!isOperationPending(lastIndexExclusive)) {<a name="line.3160"></a> +<span class="sourceLineNo">3161</span> continue;<a name="line.3161"></a> +<span class="sourceLineNo">3162</span> }<a name="line.3162"></a> +<span class="sourceLineNo">3163</span> Mutation mutation = getMutation(lastIndexExclusive);<a name="line.3163"></a> +<span class="sourceLineNo">3164</span> // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.3164"></a> +<span class="sourceLineNo">3165</span> RowLock rowLock = null;<a name="line.3165"></a> +<span class="sourceLineNo">3166</span> try {<a name="line.3166"></a> +<span class="sourceLineNo">3167</span> // if atomic then get exclusive lock, else shared lock<a name="line.3167"></a> +<span class="sourceLineNo">3168</span> rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic());<a name="line.3168"></a> +<span class="sourceLineNo">3169</span> } catch (TimeoutIOException e) {<a name="line.3169"></a> +<span class="sourceLineNo">3170</span> // We will retry when other exceptions, but we should stop if we timeout .<a name="line.3170"></a> +<span class="sourceLineNo">3171</span> throw e;<a name="line.3171"></a> +<span class="sourceLineNo">3172</span> } catch (IOException ioe) {<a name="line.3172"></a> +<span class="sourceLineNo">3173</span> LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3173"></a> +<span class="sourceLineNo">3174</span> if (isAtomic()) { // fail, atomic means all or none<a name="line.3174"></a> +<span class="sourceLineNo">3175</span> throw ioe;<a name="line.3175"></a> +<span class="sourceLineNo">3176</span> }<a name="line.3176"></a> +<span class="sourceLineNo">3177</span> }<a name="line.3177"></a> +<span class="sourceLineNo">3178</span> if (rowLock == null) {<a name="line.3178"></a> +<span class="sourceLineNo">3179</span> // We failed to grab another lock<a name="line.3179"></a> +<span class="sourceLineNo">3180</span> if (isAtomic()) {<a name="line.3180"></a> +<span class="sourceLineNo">3181</span> throw new IOException("Can't apply all operations atomically!");<a name="line.3181"></a> +<span class="sourceLineNo">3182</span> }<a name="line.3182"></a> +<span class="sourceLineNo">3183</span> break; // Stop acquiring more rows for this batch<a name="line.3183"></a> +<span class="sourceLineNo">3184</span> } else {<a name="line.3184"></a> +<span class="sourceLineNo">3185</span> acquiredRowLocks.add(rowLock);<a name="line.3185"></a> +<span class="sourceLineNo">3186</span> }<a name="line.3186"></a> +<span class="sourceLineNo">3187</span> readyToWriteCount++;<a name="line.3187"></a> +<span class="sourceLineNo">3188</span> }<a name="line.3188"></a> +<span class="sourceLineNo">3189</span> return createMiniBatch(lastIndexExclusive, readyToWriteCount);<a name="line.3189"></a> +<span class="sourceLineNo">3190</span> }<a name="line.3190"></a> +<span class="sourceLineNo">3191</span><a name="line.3191"></a> +<span class="sourceLineNo">3192</span> protected MiniBatchOperationInProgress<Mutation> createMiniBatch(final int lastIndexExclusive,<a name="line.3192"></a> +<span class="sourceLineNo">3193</span> final int readyToWriteCount) {<a name="line.3193"></a> +<span class="sourceLineNo">3194</span> return new MiniBatchOperationInProgress<>(getMutationsForCoprocs(), retCodeDetails,<a name="line.3194"></a> +<span class="sourceLineNo">3195</span> walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);<a name="line.3195"></a> +<span class="sourceLineNo">3196</span> }<a name="line.3196"></a> +<span class="sourceLineNo">3197</span><a name="line.3197"></a> +<span class="sourceLineNo">3198</span> /**<a name="line.3198"></a> +<span class="sourceLineNo">3199</span> * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are<a name="line.3199"></a> +<span class="sourceLineNo">3200</span> * present, they are merged to result WALEdit.<a name="line.3200"></a> +<span class="sourceLineNo">3201</span> */<a name="line.3201"></a> +<span class="sourceLineNo">3202</span> public List<Pair<NonceKey, WALEdit>> buildWALEdits(<a name="line.3202"></a> +<span class="sourceLineNo">3203</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {<a name="line.3203"></a> +<span class="sourceLineNo">3204</span> List<Pair<NonceKey, WALEdit>> walEdits = new ArrayList<>();<a name="line.3204"></a> +<span class="sourceLineNo">3205</span><a name="line.3205"></a> +<span class="sourceLineNo">3206</span> visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() {<a name="line.3206"></a> +<span class="sourceLineNo">3207</span> private Pair<NonceKey, WALEdit> curWALEditForNonce;<a name="line.3207"></a> +<span class="sourceLineNo">3208</span> @Override<a name="line.3208"></a> +<span class="sourceLineNo">3209</span> public boolean visit(int index) throws IOException {<a name="line.3209"></a> +<span class="sourceLineNo">3210</span> Mutation m = getMutation(index);<a name="line.3210"></a> +<span class="sourceLineNo">3211</span> // we use durability of the original mutation for the mutation passed by CP.<a name="line.3211"></a> +<span class="sourceLineNo">3212</span> if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {<a name="line.3212"></a> +<span class="sourceLineNo">3213</span> region.recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3213"></a> +<span class="sourceLineNo">3214</span> return true;<a name="line.3214"></a> +<span class="sourceLineNo">3215</span> }<a name="line.3215"></a> +<span class="sourceLineNo">3216</span><a name="line.3216"></a> +<span class="sourceLineNo">3217</span> // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each.<a name="line.3217"></a> +<span class="sourceLineNo">3218</span> // Given how nonce keys are originally written, these should be contiguous.<a name="line.3218"></a> +<span class="sourceLineNo">3219</span> // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3219"></a> +<span class="sourceLineNo">3220</span> long nonceGroup = getNonceGroup(index);<a name="line.3220"></a> +<span class="sourceLineNo">3221</span> long nonce = getNonce(index);<a name="line.3221"></a> +<span class="sourceLineNo">3222</span> if (curWALEditForNonce == null ||<a name="line.3222"></a> +<span class="sourceLineNo">3223</span> curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup ||<a name="line.3223"></a> +<span class="sourceLineNo">3224</span> curWALEditForNonce.getFirst().getNonce() != nonce) {<a name="line.3224"></a> +<span class="sourceLineNo">3225</span> curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),<a name="line.3225"></a> +<span class="sourceLineNo">3226</span> new WALEdit(miniBatchOp.getCellCount(), isInReplay()));<a name="line.3226"></a> +<span class="sourceLineNo">3227</span> walEdits.add(curWALEditForNonce);<a name="line.3227"></a> +<span class="sourceLineNo">3228</span> }<a name="line.3228"></a> +<span class="sourceLineNo">3229</span> WALEdit walEdit = curWALEditForNonce.getSecond();<a name="line.3229"></a> +<span class="sourceLineNo">3230</span><a name="line.3230"></a> +<span class="sourceLineNo">3231</span> // Add WAL edits by CP<a name="line.3231"></a> +<span class="sourceLineNo">3232</span> WALEdit fromCP = walEditsFromCoprocessors[index];<a name="line.3232"></a> +<span class="sourceLineNo">3233</span> if (fromCP != null) {<a name="line.3233"></a> +<span class="sourceLineNo">3234</span> for (Cell cell : fromCP.getCells()) {<a name="line.3234"></a> +<span class="sourceLineNo">3235</span> walEdit.add(cell);<a name="line.3235"></a> +<span class="sourceLineNo">3236</span> }<a name="line.3236"></a> +<span class="sourceLineNo">3237</span> }<a name="line.3237"></a> +<span class="sourceLineNo">3238</span> addFamilyMapToWALEdit(familyCellMaps[index], walEdit);<a name="line.3238"></a> +<span class="sourceLineNo">3239</span><a name="line.3239"></a> +<span class="sourceLineNo">3240</span> return true;<a name="line.3240"></a> +<span class="sourceLineNo">3241</span> }<a name="line.3241"></a> +<span class="sourceLineNo">3242</span> });<a name="line.3242"></a> +<span class="sourceLineNo">3243</span> return walEdits;<a name="line.3243"></a> +<span class="sourceLineNo">3244</span> }<a name="line.3244"></a> +<span class="sourceLineNo">3245</span><a name="line.3245"></a> +<span class="sourceLineNo">3246</span> /**<a name="line.3246"></a> +<span class="sourceLineNo">3247</span> * This method completes mini-batch operations by calling postBatchMutate() CP hook (if<a name="line.3247"></a> +<span class="sourceLineNo">3248</span> * required) and completing mvcc.<a name="line.3248"></a> +<span class="sourceLineNo">3249</span> */<a name="line.3249"></a> +<span class="sourceLineNo">3250</span> public void completeMiniBatchOperations(<a name="line.3250"></a> +<span class="sourceLineNo">3251</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)<a name="line.3251"></a> +<span class="sourceLineNo">3252</span> throws IOException {<a name="line.3252"></a> +<span class="sourceLineNo">3253</span> if (writeEntry != null) {<a name="line.3253"></a> +<span class="sourceLineNo">3254</span> region.mvcc.completeAndWait(writeEntry);<a name="line.3254"></a> +<span class="sourceLineNo">3255</span> }<a name="line.3255"></a> +<span class="sourceLineNo">3256</span> }<a name="line.3256"></a> +<span class="sourceLineNo">3257</span><a name="line.3257"></a> +<span class="sourceLineNo">3258</span> public void doPostOpCleanupForMiniBatch(<a name="line.3258"></a> +<span class="sourceLineNo">3259</span> final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,<a name="line.3259"></a> +<span class="sourceLineNo">3260</span> boolean success) throws IOException {}<a name="line.3260"></a> +<span class="sourceLineNo">3261</span><a name="line.3261"></a> +<span class="sourceLineNo">3262</span> /**<a name="line.3262"></a> +<span class="sourceLineNo">3263</span> * Atomically apply the given map of family->edits to the memstore.<a name="line.3263"></a> +<span class="sourceLineNo">3264</span> * This handles the consistency control on its own, but the caller<a name="line.3264"></a> +<span class="sourceLineNo">3265</span> * should already have locked updatesLock.readLock(). This also does<a name="line.3265"></a> +<span class="sourceLineNo">3266</span> * <b>not</b> check the families for validity.<a name="line.3266"></a> +<span class="sourceLineNo">3267</span> *<a name="line.3267"></a> +<span class="sourceLineNo">3268</span> * @param familyMap Map of Cells by family<a name="line.3268"></a> +<span class="sourceLineNo">3269</span> */<a name="line.3269"></a> +<span class="sourceLineNo">3270</span> protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,<a name="line.3270"></a> +<span class="sourceLineNo">3271</span> MemStoreSizing memstoreAccounting) throws IOException {<a name="line.3271"></a> +<span class="sourceLineNo">3272</span> for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {<a name="line.3272"></a> +<span class="sourceLineNo">3273</span> byte[] family = e.getKey();<a name="line.3273"></a> +<span class="sourceLineNo">3274</span> List<Cell> cells = e.getValue();<a name="line.3274"></a> +<span class="sourceLineNo">3275</span> assert cells instanceof RandomAccess;<a name="line.3275"></a> +<span class="sourceLineNo">3276</span> region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);<a name="line.3276"></a> +<span class="sourceLineNo">3277</span> }<a name="line.3277"></a> +<span class="sourceLineNo">3278</span> }<a name="line.3278"></a> +<span class="sourceLineNo">3279</span><a name="line.3279"></a> +<span class="sourceLineNo">3280</span> /**<a name="line.3280"></a> +<span class="sourceLineNo">3281</span> * Append the given map of family->edits to a WALEdit data structure.<a name="line.3281"></a> +<span class="sourceLineNo">3282</span> * This does not write to the WAL itself.<a name="line.3282"></a> +<span class="sourceLineNo">3283</span> * @param familyMap map of family->edits<a name="line.3283"></a> +<span class="sourceLineNo">3284</span> * @param walEdit the destination entry to append into<a name="line.3284"></a> +<span class="sourceLineNo">3285</span> */<a name="line.3285"></a> +<span class="sourceLineNo">3286</span> private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,<a name="line.3286"></a> +<span class="sourceLineNo">3287</span> WALEdit walEdit) {<a name="line.3287"></a> +<span class="sourceLineNo">3288</span> for (List<Cell> edits : familyMap.values()) {<a name="line.3288"></a> +<span class="sourceLineNo">3289</span> assert edits instanceof RandomAccess;<a name="line.3289"></a> +<span class="sourceLineNo">3290</span> int listSize = edits.size();<a name="line.3290"></a> +<span class="sourceLineNo">3291</span> for (int i=0; i < listSize; i++) {<a name="line.3291"></a> +<span class="sourceLineNo">3292</span> Cell cell = edits.get(i);<a name="line.3292"></a> +<span class="sourceLineNo">3293</span> walEdit.add(cell);<a name="line.3293"></a> +<span class="sourceLineNo">3294</span> }<a name="line.3294"></a> +<span class="sourceLineNo">3295</span> }<a name="line.3295"></a> +<span class="sourceLineNo">3296</span> }<a name="line.3296"></a> +<span class="sourceLineNo">3297</span> }<a name="line.3297"></a> +<span class="sourceLineNo">3298</span><a name="line.3298"></a> +<span class="sourceLineNo">3299</span> /**<a name="line.3299"></a> +<span class="sourceLineNo">3300</span> * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most<a name="line.3300"></a> +<span class="sourceLineNo">3301</span> * of the logic is same.<a name="line.3301"></a> +<span class="sourceLineNo">3302</span> */<a name="line.3302"></a> +<span class="sourceLineNo">3303</span> static class MutationBatchOperation extends BatchOperation<Mutation> {<a name="line.3303"></a> +<span class="sourceLineNo">3304</span> private long nonceGroup;<a name="line.3304"></a> +<span class="sourceLineNo">3305</span> private long nonce;<a name="line.3305"></a> +<span class="sourceLineNo">3306</span> public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,<a name="line.3306"></a> +<span class="sourceLineNo">3307</span> long nonceGroup, long nonce) {<a name="line.3307"></a> +<span class="sourceLineNo">3308</span> super(region, operations);<a name="line.3308"></a> +<span class="sourceLineNo">3309</span> this.atomic = atomic;<a name="line.3309"></a> +<span class="sourceLineNo">3310</span> this.nonceGroup = nonceGroup;<a name="line.3310"></a> +<span class="sourceLineNo">3311</span> this.nonce = nonce;<a name="line.3311"></a> +<span class="sourceLineNo">3312</span> }<a name="line.3312"></a> +<span class="sourceLineNo">3313</span><a name="line.3313"></a> +<span class="sourceLineNo">3314</span> @Override<a name="line.3314"></a> +<span class="sourceLineNo">3315</span> public Mutation getMutation(int index) {<a name="line.3315"></a> +<span class="sourceLineNo">3316</span> return this.operations[index];<a name="line.3316"></a> +<span class="sourceLineNo">3317</span> }<a name="line.3317"></a> +<span class="sourceLineNo">3318</span><a name="line.3318"></a> +<span class="sourceLineNo">3319</span> @Override<a name="line.3319"></a> +<span class="sourceLineNo">3320</span> public long getNonceGroup(int index) {<a name="line.3320"></a> +<span class="sourceLineNo">3321</span> return nonceGroup;<a name="line.3321"></a> +<span class="sourceLineNo">3322</span> }<a name="line.3322"></a> +<span class="sourceLineNo">3323</span><a name="line.3323"></a> +<span class="sourceLineNo">3324</span> @Override<a name="line.3324"></a> +<span class="sourceLineNo">3325</span> public long getNonce(int index) {<a name="line.3325"></a> +<span class="sourceLineNo">3326</span> return nonce;<a name="line.3326"></a> +<span class="sourceLineNo">3327</span> }<a name="line.3327"></a> +<span class="sourceLineNo">3328</span><a name="line.3328"></a> +<span class="sourceLineNo">3329</span> @Override<a name="line.3329"></a> +<span class="sourceLineNo">3330</span> public Mutation[] getMutationsForCoprocs() {<a name="line.3330"></a> +<span class="sourceLineNo">3331</span> return this.operations;<a name="line.3331"></a> +<span class="sourceLineNo">3332</span> }<a name="line.3332"></a> +<span class="sourceLineNo">3333</span><a name="line.3333"></a> +<span class="sourceLineNo">3334</span> @Override<a name="line.3334"></a> +<span class="sourceLineNo">3335</span> public boolean isInReplay() {<a name="line.3335"></a> +<span class="sourceLineNo">3336</span> return false;<a name="line.3336"></a> +<span class="sourceLineNo">3337</span> }<a name="line.3337"></a> +<span class="sourceLineNo">3338</span><a name="line.3338"></a> +<span class="sourceLineNo">3339</span> @Override<a name="line.3339"></a> +<span class="sourceLineNo">3340</span> public long getOrigLogSeqNum() {<a name="line.3340"></a> +<span class="sourceLineNo">3341</span> return WALKey.NO_SEQUENCE_ID;<a name="line.3341"></a> +<span class="sourceLineNo">3342</span> }<a name="line.3342"></a> +<span class="sourceLineNo">3343</span><a name="line.3343"></a> +<span class="sourceLineNo">3344</span> @Override<a name="line.3344"></a> +<span class="sourceLineNo">3345</span> public void startRegionOperation() throws IOException {<a name="line.3345"></a> +<span class="sourceLineNo">3346</span> region.startRegionOperation(Operation.BATCH_MUTATE);<a name="line.3346"></a> +<span class="sourceLineNo">3347</span> }<a name="line.3347"></a> +<span class="sourceLineNo">3348</span><a name="line.3348"></a> +<span class="sourceLineNo">3349</span> @Override<a name="line.3349"></a> +<span class="sourceLineNo">3350</span> public void closeRegionOperation() throws IOException {<a name="line.3350"></a> +<span class="sourceLineNo">3351</span> region.closeRegionOperation(Operation.BATCH_MUTATE);<a name="line.3351"></a> +<span class="sourceLineNo">3352</span> }<a name="line.3352"></a> +<span class="sourceLineNo">3353</span><a name="line.3353"></a> +<span class="sourceLineNo">3354</span> @Override<a name="line.3354"></a> +<span class="sourceLineNo">3355</span> public void checkAndPreparePut(Put p) throws IOException {<a name="line.3355"></a> +<span class="sourceLineNo">3356</span> region.checkFamilies(p.getFamilyCellMap().keySet());<a name="line.3356"></a> +<span class="sourceLineNo">3357</span> }<a name="line.3357"></a> +<span class="sourceLineNo">3358</span><a name="line.3358"></a> +<span class="sourceLineNo">3359</span> @Override<a name="line.3359"></a> +<span class="sourceLineNo">3360</span> public void checkAndPrepare() throws IOException {<a name="line.3360"></a> +<span class="sourceLineNo">3361</span> final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes<a name="line.3361"></a> +<span class="sourceLineNo">3362</span> visitBatchOperations(true, this.size(), new Visitor() {<a name="line.3362"></a> +<span class="sourceLineNo">3363</span> private long now = EnvironmentEdgeManager.currentTime();<a name="line.3363"></a> +<span class="sourceLineNo">3364</span> private WALEdit walEdit;<a name="line.3364"></a> +<span class="sourceLineNo">3365</span> @Override<a name="line.3365"></a> +<span class="sourceLineNo">3366</span> public boolean visit(int index) throws IOException {<a name="line.3366"></a> +<span class="sourceLineNo">3367</span> // Run coprocessor pre hook outside of locks to avoid deadlock<a n
<TRUNCATED>