http://git-wip-us.apache.org/repos/asf/hbase-site/blob/8bb348c6/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html index fbdde18..1d64963 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BatchOperation.html @@ -2959,7 +2959,7 @@ <span class="sourceLineNo">2951</span> Set<byte[]> deletesCfSet = null;<a name="line.2951"></a> <span class="sourceLineNo">2952</span> long currentNonceGroup = HConstants.NO_NONCE;<a name="line.2952"></a> <span class="sourceLineNo">2953</span> long currentNonce = HConstants.NO_NONCE;<a name="line.2953"></a> -<span class="sourceLineNo">2954</span> WALEdit walEdit = new WALEdit(replay);<a name="line.2954"></a> +<span class="sourceLineNo">2954</span> WALEdit walEdit = null;<a name="line.2954"></a> <span class="sourceLineNo">2955</span> boolean locked = false;<a name="line.2955"></a> <span class="sourceLineNo">2956</span> // reference family maps directly so coprocessors can mutate them if desired<a name="line.2956"></a> <span class="sourceLineNo">2957</span> Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];<a name="line.2957"></a> @@ -2970,4878 +2970,4890 @@ <span class="sourceLineNo">2962</span> int noOfPuts = 0;<a name="line.2962"></a> <span class="sourceLineNo">2963</span> int noOfDeletes = 0;<a name="line.2963"></a> <span class="sourceLineNo">2964</span> WriteEntry writeEntry = null;<a name="line.2964"></a> -<span class="sourceLineNo">2965</span> /** Keep track of the locks we hold so we can release them in finally clause */<a name="line.2965"></a> -<span class="sourceLineNo">2966</span> List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);<a name="line.2966"></a> -<span class="sourceLineNo">2967</span> try {<a name="line.2967"></a> -<span class="sourceLineNo">2968</span> // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.<a name="line.2968"></a> -<span class="sourceLineNo">2969</span> int numReadyToWrite = 0;<a name="line.2969"></a> -<span class="sourceLineNo">2970</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.2970"></a> -<span class="sourceLineNo">2971</span> while (lastIndexExclusive < batchOp.operations.length) {<a name="line.2971"></a> -<span class="sourceLineNo">2972</span> if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now)) {<a name="line.2972"></a> -<span class="sourceLineNo">2973</span> lastIndexExclusive++;<a name="line.2973"></a> -<span class="sourceLineNo">2974</span> continue;<a name="line.2974"></a> -<span class="sourceLineNo">2975</span> }<a name="line.2975"></a> -<span class="sourceLineNo">2976</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.2976"></a> -<span class="sourceLineNo">2977</span> // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.2977"></a> -<span class="sourceLineNo">2978</span> RowLock rowLock = null;<a name="line.2978"></a> -<span class="sourceLineNo">2979</span> try {<a name="line.2979"></a> -<span class="sourceLineNo">2980</span> rowLock = getRowLock(mutation.getRow(), true);<a name="line.2980"></a> -<span class="sourceLineNo">2981</span> } catch (IOException ioe) {<a name="line.2981"></a> -<span class="sourceLineNo">2982</span> LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.2982"></a> -<span class="sourceLineNo">2983</span> }<a name="line.2983"></a> -<span class="sourceLineNo">2984</span> if (rowLock == null) {<a name="line.2984"></a> -<span class="sourceLineNo">2985</span> // We failed to grab another lock<a name="line.2985"></a> -<span class="sourceLineNo">2986</span> break; // Stop acquiring more rows for this batch<a name="line.2986"></a> -<span class="sourceLineNo">2987</span> } else {<a name="line.2987"></a> -<span class="sourceLineNo">2988</span> acquiredRowLocks.add(rowLock);<a name="line.2988"></a> -<span class="sourceLineNo">2989</span> }<a name="line.2989"></a> -<span class="sourceLineNo">2990</span><a name="line.2990"></a> -<span class="sourceLineNo">2991</span> lastIndexExclusive++;<a name="line.2991"></a> -<span class="sourceLineNo">2992</span> numReadyToWrite++;<a name="line.2992"></a> -<span class="sourceLineNo">2993</span><a name="line.2993"></a> -<span class="sourceLineNo">2994</span> if (mutation instanceof Put) {<a name="line.2994"></a> -<span class="sourceLineNo">2995</span> // If Column Families stay consistent through out all of the<a name="line.2995"></a> -<span class="sourceLineNo">2996</span> // individual puts then metrics can be reported as a multiput across<a name="line.2996"></a> -<span class="sourceLineNo">2997</span> // column families in the first put.<a name="line.2997"></a> -<span class="sourceLineNo">2998</span> if (putsCfSet == null) {<a name="line.2998"></a> -<span class="sourceLineNo">2999</span> putsCfSet = mutation.getFamilyCellMap().keySet();<a name="line.2999"></a> -<span class="sourceLineNo">3000</span> } else {<a name="line.3000"></a> -<span class="sourceLineNo">3001</span> putsCfSetConsistent = putsCfSetConsistent<a name="line.3001"></a> -<span class="sourceLineNo">3002</span> && mutation.getFamilyCellMap().keySet().equals(putsCfSet);<a name="line.3002"></a> -<span class="sourceLineNo">3003</span> }<a name="line.3003"></a> -<span class="sourceLineNo">3004</span> } else {<a name="line.3004"></a> -<span class="sourceLineNo">3005</span> if (deletesCfSet == null) {<a name="line.3005"></a> -<span class="sourceLineNo">3006</span> deletesCfSet = mutation.getFamilyCellMap().keySet();<a name="line.3006"></a> -<span class="sourceLineNo">3007</span> } else {<a name="line.3007"></a> -<span class="sourceLineNo">3008</span> deletesCfSetConsistent = deletesCfSetConsistent<a name="line.3008"></a> -<span class="sourceLineNo">3009</span> && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);<a name="line.3009"></a> -<span class="sourceLineNo">3010</span> }<a name="line.3010"></a> -<span class="sourceLineNo">3011</span> }<a name="line.3011"></a> -<span class="sourceLineNo">3012</span> }<a name="line.3012"></a> -<span class="sourceLineNo">3013</span><a name="line.3013"></a> -<span class="sourceLineNo">3014</span> // We've now grabbed as many mutations off the list as we can<a name="line.3014"></a> -<span class="sourceLineNo">3015</span><a name="line.3015"></a> -<span class="sourceLineNo">3016</span> // STEP 2. Update any LATEST_TIMESTAMP timestamps<a name="line.3016"></a> -<span class="sourceLineNo">3017</span> // We should record the timestamp only after we have acquired the rowLock,<a name="line.3017"></a> -<span class="sourceLineNo">3018</span> // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp<a name="line.3018"></a> -<span class="sourceLineNo">3019</span> now = EnvironmentEdgeManager.currentTime();<a name="line.3019"></a> -<span class="sourceLineNo">3020</span> byte[] byteNow = Bytes.toBytes(now);<a name="line.3020"></a> -<span class="sourceLineNo">3021</span><a name="line.3021"></a> -<span class="sourceLineNo">3022</span> // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?<a name="line.3022"></a> -<span class="sourceLineNo">3023</span> if (numReadyToWrite <= 0) {<a name="line.3023"></a> -<span class="sourceLineNo">3024</span> return 0L;<a name="line.3024"></a> -<span class="sourceLineNo">3025</span> }<a name="line.3025"></a> +<span class="sourceLineNo">2965</span> int cellCount = 0;<a name="line.2965"></a> +<span class="sourceLineNo">2966</span> /** Keep track of the locks we hold so we can release them in finally clause */<a name="line.2966"></a> +<span class="sourceLineNo">2967</span> List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);<a name="line.2967"></a> +<span class="sourceLineNo">2968</span> try {<a name="line.2968"></a> +<span class="sourceLineNo">2969</span> // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.<a name="line.2969"></a> +<span class="sourceLineNo">2970</span> int numReadyToWrite = 0;<a name="line.2970"></a> +<span class="sourceLineNo">2971</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.2971"></a> +<span class="sourceLineNo">2972</span> while (lastIndexExclusive < batchOp.operations.length) {<a name="line.2972"></a> +<span class="sourceLineNo">2973</span> if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now)) {<a name="line.2973"></a> +<span class="sourceLineNo">2974</span> lastIndexExclusive++;<a name="line.2974"></a> +<span class="sourceLineNo">2975</span> continue;<a name="line.2975"></a> +<span class="sourceLineNo">2976</span> }<a name="line.2976"></a> +<span class="sourceLineNo">2977</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.2977"></a> +<span class="sourceLineNo">2978</span> // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.2978"></a> +<span class="sourceLineNo">2979</span> RowLock rowLock = null;<a name="line.2979"></a> +<span class="sourceLineNo">2980</span> try {<a name="line.2980"></a> +<span class="sourceLineNo">2981</span> rowLock = getRowLock(mutation.getRow(), true);<a name="line.2981"></a> +<span class="sourceLineNo">2982</span> } catch (IOException ioe) {<a name="line.2982"></a> +<span class="sourceLineNo">2983</span> LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.2983"></a> +<span class="sourceLineNo">2984</span> }<a name="line.2984"></a> +<span class="sourceLineNo">2985</span> if (rowLock == null) {<a name="line.2985"></a> +<span class="sourceLineNo">2986</span> // We failed to grab another lock<a name="line.2986"></a> +<span class="sourceLineNo">2987</span> break; // Stop acquiring more rows for this batch<a name="line.2987"></a> +<span class="sourceLineNo">2988</span> } else {<a name="line.2988"></a> +<span class="sourceLineNo">2989</span> acquiredRowLocks.add(rowLock);<a name="line.2989"></a> +<span class="sourceLineNo">2990</span> }<a name="line.2990"></a> +<span class="sourceLineNo">2991</span><a name="line.2991"></a> +<span class="sourceLineNo">2992</span> lastIndexExclusive++;<a name="line.2992"></a> +<span class="sourceLineNo">2993</span> numReadyToWrite++;<a name="line.2993"></a> +<span class="sourceLineNo">2994</span> if (replay) {<a name="line.2994"></a> +<span class="sourceLineNo">2995</span> for (List<Cell> cells : mutation.getFamilyCellMap().values()) {<a name="line.2995"></a> +<span class="sourceLineNo">2996</span> cellCount += cells.size();<a name="line.2996"></a> +<span class="sourceLineNo">2997</span> }<a name="line.2997"></a> +<span class="sourceLineNo">2998</span> }<a name="line.2998"></a> +<span class="sourceLineNo">2999</span> if (mutation instanceof Put) {<a name="line.2999"></a> +<span class="sourceLineNo">3000</span> // If Column Families stay consistent through out all of the<a name="line.3000"></a> +<span class="sourceLineNo">3001</span> // individual puts then metrics can be reported as a multiput across<a name="line.3001"></a> +<span class="sourceLineNo">3002</span> // column families in the first put.<a name="line.3002"></a> +<span class="sourceLineNo">3003</span> if (putsCfSet == null) {<a name="line.3003"></a> +<span class="sourceLineNo">3004</span> putsCfSet = mutation.getFamilyCellMap().keySet();<a name="line.3004"></a> +<span class="sourceLineNo">3005</span> } else {<a name="line.3005"></a> +<span class="sourceLineNo">3006</span> putsCfSetConsistent = putsCfSetConsistent<a name="line.3006"></a> +<span class="sourceLineNo">3007</span> && mutation.getFamilyCellMap().keySet().equals(putsCfSet);<a name="line.3007"></a> +<span class="sourceLineNo">3008</span> }<a name="line.3008"></a> +<span class="sourceLineNo">3009</span> } else {<a name="line.3009"></a> +<span class="sourceLineNo">3010</span> if (deletesCfSet == null) {<a name="line.3010"></a> +<span class="sourceLineNo">3011</span> deletesCfSet = mutation.getFamilyCellMap().keySet();<a name="line.3011"></a> +<span class="sourceLineNo">3012</span> } else {<a name="line.3012"></a> +<span class="sourceLineNo">3013</span> deletesCfSetConsistent = deletesCfSetConsistent<a name="line.3013"></a> +<span class="sourceLineNo">3014</span> && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);<a name="line.3014"></a> +<span class="sourceLineNo">3015</span> }<a name="line.3015"></a> +<span class="sourceLineNo">3016</span> }<a name="line.3016"></a> +<span class="sourceLineNo">3017</span> }<a name="line.3017"></a> +<span class="sourceLineNo">3018</span><a name="line.3018"></a> +<span class="sourceLineNo">3019</span> // We've now grabbed as many mutations off the list as we can<a name="line.3019"></a> +<span class="sourceLineNo">3020</span><a name="line.3020"></a> +<span class="sourceLineNo">3021</span> // STEP 2. Update any LATEST_TIMESTAMP timestamps<a name="line.3021"></a> +<span class="sourceLineNo">3022</span> // We should record the timestamp only after we have acquired the rowLock,<a name="line.3022"></a> +<span class="sourceLineNo">3023</span> // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp<a name="line.3023"></a> +<span class="sourceLineNo">3024</span> now = EnvironmentEdgeManager.currentTime();<a name="line.3024"></a> +<span class="sourceLineNo">3025</span> byte[] byteNow = Bytes.toBytes(now);<a name="line.3025"></a> <span class="sourceLineNo">3026</span><a name="line.3026"></a> -<span class="sourceLineNo">3027</span> for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {<a name="line.3027"></a> -<span class="sourceLineNo">3028</span> // skip invalid<a name="line.3028"></a> -<span class="sourceLineNo">3029</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3029"></a> -<span class="sourceLineNo">3030</span> != OperationStatusCode.NOT_RUN) {<a name="line.3030"></a> -<span class="sourceLineNo">3031</span> // lastIndexExclusive was incremented above.<a name="line.3031"></a> -<span class="sourceLineNo">3032</span> continue;<a name="line.3032"></a> -<span class="sourceLineNo">3033</span> }<a name="line.3033"></a> -<span class="sourceLineNo">3034</span><a name="line.3034"></a> -<span class="sourceLineNo">3035</span> Mutation mutation = batchOp.getMutation(i);<a name="line.3035"></a> -<span class="sourceLineNo">3036</span> if (mutation instanceof Put) {<a name="line.3036"></a> -<span class="sourceLineNo">3037</span> updateCellTimestamps(familyMaps[i].values(), byteNow);<a name="line.3037"></a> -<span class="sourceLineNo">3038</span> noOfPuts++;<a name="line.3038"></a> -<span class="sourceLineNo">3039</span> } else {<a name="line.3039"></a> -<span class="sourceLineNo">3040</span> prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);<a name="line.3040"></a> -<span class="sourceLineNo">3041</span> noOfDeletes++;<a name="line.3041"></a> -<span class="sourceLineNo">3042</span> }<a name="line.3042"></a> -<span class="sourceLineNo">3043</span> rewriteCellTags(familyMaps[i], mutation);<a name="line.3043"></a> -<span class="sourceLineNo">3044</span> }<a name="line.3044"></a> -<span class="sourceLineNo">3045</span><a name="line.3045"></a> -<span class="sourceLineNo">3046</span> lock(this.updatesLock.readLock(), numReadyToWrite);<a name="line.3046"></a> -<span class="sourceLineNo">3047</span> locked = true;<a name="line.3047"></a> -<span class="sourceLineNo">3048</span><a name="line.3048"></a> -<span class="sourceLineNo">3049</span> // calling the pre CP hook for batch mutation<a name="line.3049"></a> -<span class="sourceLineNo">3050</span> if (!replay && coprocessorHost != null) {<a name="line.3050"></a> -<span class="sourceLineNo">3051</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3051"></a> -<span class="sourceLineNo">3052</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3052"></a> -<span class="sourceLineNo">3053</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3053"></a> -<span class="sourceLineNo">3054</span> if (coprocessorHost.preBatchMutate(miniBatchOp)) {<a name="line.3054"></a> -<span class="sourceLineNo">3055</span> return 0L;<a name="line.3055"></a> -<span class="sourceLineNo">3056</span> }<a name="line.3056"></a> -<span class="sourceLineNo">3057</span> }<a name="line.3057"></a> -<span class="sourceLineNo">3058</span><a name="line.3058"></a> -<span class="sourceLineNo">3059</span> // STEP 3. Build WAL edit<a name="line.3059"></a> -<span class="sourceLineNo">3060</span> Durability durability = Durability.USE_DEFAULT;<a name="line.3060"></a> -<span class="sourceLineNo">3061</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3061"></a> -<span class="sourceLineNo">3062</span> // Skip puts that were determined to be invalid during preprocessing<a name="line.3062"></a> -<span class="sourceLineNo">3063</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {<a name="line.3063"></a> -<span class="sourceLineNo">3064</span> continue;<a name="line.3064"></a> -<span class="sourceLineNo">3065</span> }<a name="line.3065"></a> -<span class="sourceLineNo">3066</span><a name="line.3066"></a> -<span class="sourceLineNo">3067</span> Mutation m = batchOp.getMutation(i);<a name="line.3067"></a> -<span class="sourceLineNo">3068</span> Durability tmpDur = getEffectiveDurability(m.getDurability());<a name="line.3068"></a> -<span class="sourceLineNo">3069</span> if (tmpDur.ordinal() > durability.ordinal()) {<a name="line.3069"></a> -<span class="sourceLineNo">3070</span> durability = tmpDur;<a name="line.3070"></a> -<span class="sourceLineNo">3071</span> }<a name="line.3071"></a> -<span class="sourceLineNo">3072</span> if (tmpDur == Durability.SKIP_WAL) {<a name="line.3072"></a> -<span class="sourceLineNo">3073</span> recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3073"></a> -<span class="sourceLineNo">3074</span> continue;<a name="line.3074"></a> -<span class="sourceLineNo">3075</span> }<a name="line.3075"></a> -<span class="sourceLineNo">3076</span><a name="line.3076"></a> -<span class="sourceLineNo">3077</span> long nonceGroup = batchOp.getNonceGroup(i);<a name="line.3077"></a> -<span class="sourceLineNo">3078</span> long nonce = batchOp.getNonce(i);<a name="line.3078"></a> -<span class="sourceLineNo">3079</span> // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.<a name="line.3079"></a> -<span class="sourceLineNo">3080</span> // Given how nonces are originally written, these should be contiguous.<a name="line.3080"></a> -<span class="sourceLineNo">3081</span> // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3081"></a> -<span class="sourceLineNo">3082</span> if (nonceGroup != currentNonceGroup || nonce != currentNonce) {<a name="line.3082"></a> -<span class="sourceLineNo">3083</span> // Write what we have so far for nonces out to WAL<a name="line.3083"></a> -<span class="sourceLineNo">3084</span> appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce);<a name="line.3084"></a> -<span class="sourceLineNo">3085</span> walEdit = new WALEdit(replay);<a name="line.3085"></a> -<span class="sourceLineNo">3086</span> currentNonceGroup = nonceGroup;<a name="line.3086"></a> -<span class="sourceLineNo">3087</span> currentNonce = nonce;<a name="line.3087"></a> -<span class="sourceLineNo">3088</span> }<a name="line.3088"></a> -<span class="sourceLineNo">3089</span><a name="line.3089"></a> -<span class="sourceLineNo">3090</span> // Add WAL edits by CP<a name="line.3090"></a> -<span class="sourceLineNo">3091</span> WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];<a name="line.3091"></a> -<span class="sourceLineNo">3092</span> if (fromCP != null) {<a name="line.3092"></a> -<span class="sourceLineNo">3093</span> for (Cell cell : fromCP.getCells()) {<a name="line.3093"></a> -<span class="sourceLineNo">3094</span> walEdit.add(cell);<a name="line.3094"></a> -<span class="sourceLineNo">3095</span> }<a name="line.3095"></a> -<span class="sourceLineNo">3096</span> }<a name="line.3096"></a> -<span class="sourceLineNo">3097</span> addFamilyMapToWALEdit(familyMaps[i], walEdit);<a name="line.3097"></a> -<span class="sourceLineNo">3098</span> }<a name="line.3098"></a> -<span class="sourceLineNo">3099</span><a name="line.3099"></a> -<span class="sourceLineNo">3100</span> // STEP 4. Append the final edit to WAL and sync.<a name="line.3100"></a> -<span class="sourceLineNo">3101</span> Mutation mutation = batchOp.getMutation(firstIndex);<a name="line.3101"></a> -<span class="sourceLineNo">3102</span> WALKey walKey = null;<a name="line.3102"></a> -<span class="sourceLineNo">3103</span> if (replay) {<a name="line.3103"></a> -<span class="sourceLineNo">3104</span> // use wal key from the original<a name="line.3104"></a> -<span class="sourceLineNo">3105</span> walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3105"></a> -<span class="sourceLineNo">3106</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3106"></a> -<span class="sourceLineNo">3107</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3107"></a> -<span class="sourceLineNo">3108</span> walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());<a name="line.3108"></a> -<span class="sourceLineNo">3109</span> }<a name="line.3109"></a> -<span class="sourceLineNo">3110</span> // Not sure what is going on here when replay is going on... does the below append get<a name="line.3110"></a> -<span class="sourceLineNo">3111</span> // called for replayed edits? Am afraid to change it without test.<a name="line.3111"></a> -<span class="sourceLineNo">3112</span> if (!walEdit.isEmpty()) {<a name="line.3112"></a> -<span class="sourceLineNo">3113</span> if (!replay) {<a name="line.3113"></a> -<span class="sourceLineNo">3114</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.3114"></a> -<span class="sourceLineNo">3115</span> walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3115"></a> -<span class="sourceLineNo">3116</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3116"></a> -<span class="sourceLineNo">3117</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3117"></a> -<span class="sourceLineNo">3118</span> }<a name="line.3118"></a> -<span class="sourceLineNo">3119</span> // TODO: Use the doAppend methods below... complicated by the replay stuff above.<a name="line.3119"></a> -<span class="sourceLineNo">3120</span> try {<a name="line.3120"></a> -<span class="sourceLineNo">3121</span> long txid =<a name="line.3121"></a> -<span class="sourceLineNo">3122</span> this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.3122"></a> -<span class="sourceLineNo">3123</span> if (txid != 0) sync(txid, durability);<a name="line.3123"></a> -<span class="sourceLineNo">3124</span> writeEntry = walKey.getWriteEntry();<a name="line.3124"></a> -<span class="sourceLineNo">3125</span> } catch (IOException ioe) {<a name="line.3125"></a> -<span class="sourceLineNo">3126</span> if (walKey != null) mvcc.complete(walKey.getWriteEntry());<a name="line.3126"></a> -<span class="sourceLineNo">3127</span> throw ioe;<a name="line.3127"></a> -<span class="sourceLineNo">3128</span> }<a name="line.3128"></a> -<span class="sourceLineNo">3129</span> }<a name="line.3129"></a> -<span class="sourceLineNo">3130</span> if (walKey == null) {<a name="line.3130"></a> -<span class="sourceLineNo">3131</span> // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid.<a name="line.3131"></a> -<span class="sourceLineNo">3132</span> writeEntry = mvcc.begin();<a name="line.3132"></a> -<span class="sourceLineNo">3133</span> }<a name="line.3133"></a> -<span class="sourceLineNo">3134</span><a name="line.3134"></a> -<span class="sourceLineNo">3135</span> // STEP 5. Write back to memstore<a name="line.3135"></a> -<span class="sourceLineNo">3136</span> long addedSize = 0;<a name="line.3136"></a> -<span class="sourceLineNo">3137</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3137"></a> -<span class="sourceLineNo">3138</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {<a name="line.3138"></a> -<span class="sourceLineNo">3139</span> continue;<a name="line.3139"></a> +<span class="sourceLineNo">3027</span> // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?<a name="line.3027"></a> +<span class="sourceLineNo">3028</span> if (numReadyToWrite <= 0) {<a name="line.3028"></a> +<span class="sourceLineNo">3029</span> return 0L;<a name="line.3029"></a> +<span class="sourceLineNo">3030</span> }<a name="line.3030"></a> +<span class="sourceLineNo">3031</span><a name="line.3031"></a> +<span class="sourceLineNo">3032</span> for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {<a name="line.3032"></a> +<span class="sourceLineNo">3033</span> // skip invalid<a name="line.3033"></a> +<span class="sourceLineNo">3034</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3034"></a> +<span class="sourceLineNo">3035</span> != OperationStatusCode.NOT_RUN) {<a name="line.3035"></a> +<span class="sourceLineNo">3036</span> // lastIndexExclusive was incremented above.<a name="line.3036"></a> +<span class="sourceLineNo">3037</span> continue;<a name="line.3037"></a> +<span class="sourceLineNo">3038</span> }<a name="line.3038"></a> +<span class="sourceLineNo">3039</span><a name="line.3039"></a> +<span class="sourceLineNo">3040</span> Mutation mutation = batchOp.getMutation(i);<a name="line.3040"></a> +<span class="sourceLineNo">3041</span> if (mutation instanceof Put) {<a name="line.3041"></a> +<span class="sourceLineNo">3042</span> updateCellTimestamps(familyMaps[i].values(), byteNow);<a name="line.3042"></a> +<span class="sourceLineNo">3043</span> noOfPuts++;<a name="line.3043"></a> +<span class="sourceLineNo">3044</span> } else {<a name="line.3044"></a> +<span class="sourceLineNo">3045</span> prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);<a name="line.3045"></a> +<span class="sourceLineNo">3046</span> noOfDeletes++;<a name="line.3046"></a> +<span class="sourceLineNo">3047</span> }<a name="line.3047"></a> +<span class="sourceLineNo">3048</span> rewriteCellTags(familyMaps[i], mutation);<a name="line.3048"></a> +<span class="sourceLineNo">3049</span> WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];<a name="line.3049"></a> +<span class="sourceLineNo">3050</span> if (fromCP != null) {<a name="line.3050"></a> +<span class="sourceLineNo">3051</span> cellCount += fromCP.size();<a name="line.3051"></a> +<span class="sourceLineNo">3052</span> }<a name="line.3052"></a> +<span class="sourceLineNo">3053</span> for (List<Cell> cells : familyMaps[i].values()) {<a name="line.3053"></a> +<span class="sourceLineNo">3054</span> cellCount += cells.size();<a name="line.3054"></a> +<span class="sourceLineNo">3055</span> }<a name="line.3055"></a> +<span class="sourceLineNo">3056</span> }<a name="line.3056"></a> +<span class="sourceLineNo">3057</span> walEdit = new WALEdit(cellCount, replay);<a name="line.3057"></a> +<span class="sourceLineNo">3058</span> lock(this.updatesLock.readLock(), numReadyToWrite);<a name="line.3058"></a> +<span class="sourceLineNo">3059</span> locked = true;<a name="line.3059"></a> +<span class="sourceLineNo">3060</span><a name="line.3060"></a> +<span class="sourceLineNo">3061</span> // calling the pre CP hook for batch mutation<a name="line.3061"></a> +<span class="sourceLineNo">3062</span> if (!replay && coprocessorHost != null) {<a name="line.3062"></a> +<span class="sourceLineNo">3063</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3063"></a> +<span class="sourceLineNo">3064</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3064"></a> +<span class="sourceLineNo">3065</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3065"></a> +<span class="sourceLineNo">3066</span> if (coprocessorHost.preBatchMutate(miniBatchOp)) {<a name="line.3066"></a> +<span class="sourceLineNo">3067</span> return 0L;<a name="line.3067"></a> +<span class="sourceLineNo">3068</span> }<a name="line.3068"></a> +<span class="sourceLineNo">3069</span> }<a name="line.3069"></a> +<span class="sourceLineNo">3070</span><a name="line.3070"></a> +<span class="sourceLineNo">3071</span> // STEP 3. Build WAL edit<a name="line.3071"></a> +<span class="sourceLineNo">3072</span> Durability durability = Durability.USE_DEFAULT;<a name="line.3072"></a> +<span class="sourceLineNo">3073</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3073"></a> +<span class="sourceLineNo">3074</span> // Skip puts that were determined to be invalid during preprocessing<a name="line.3074"></a> +<span class="sourceLineNo">3075</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {<a name="line.3075"></a> +<span class="sourceLineNo">3076</span> continue;<a name="line.3076"></a> +<span class="sourceLineNo">3077</span> }<a name="line.3077"></a> +<span class="sourceLineNo">3078</span><a name="line.3078"></a> +<span class="sourceLineNo">3079</span> Mutation m = batchOp.getMutation(i);<a name="line.3079"></a> +<span class="sourceLineNo">3080</span> Durability tmpDur = getEffectiveDurability(m.getDurability());<a name="line.3080"></a> +<span class="sourceLineNo">3081</span> if (tmpDur.ordinal() > durability.ordinal()) {<a name="line.3081"></a> +<span class="sourceLineNo">3082</span> durability = tmpDur;<a name="line.3082"></a> +<span class="sourceLineNo">3083</span> }<a name="line.3083"></a> +<span class="sourceLineNo">3084</span> if (tmpDur == Durability.SKIP_WAL) {<a name="line.3084"></a> +<span class="sourceLineNo">3085</span> recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3085"></a> +<span class="sourceLineNo">3086</span> continue;<a name="line.3086"></a> +<span class="sourceLineNo">3087</span> }<a name="line.3087"></a> +<span class="sourceLineNo">3088</span><a name="line.3088"></a> +<span class="sourceLineNo">3089</span> long nonceGroup = batchOp.getNonceGroup(i);<a name="line.3089"></a> +<span class="sourceLineNo">3090</span> long nonce = batchOp.getNonce(i);<a name="line.3090"></a> +<span class="sourceLineNo">3091</span> // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.<a name="line.3091"></a> +<span class="sourceLineNo">3092</span> // Given how nonces are originally written, these should be contiguous.<a name="line.3092"></a> +<span class="sourceLineNo">3093</span> // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3093"></a> +<span class="sourceLineNo">3094</span> if (nonceGroup != currentNonceGroup || nonce != currentNonce) {<a name="line.3094"></a> +<span class="sourceLineNo">3095</span> // Write what we have so far for nonces out to WAL<a name="line.3095"></a> +<span class="sourceLineNo">3096</span> appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce);<a name="line.3096"></a> +<span class="sourceLineNo">3097</span> walEdit = new WALEdit(cellCount, replay);<a name="line.3097"></a> +<span class="sourceLineNo">3098</span> currentNonceGroup = nonceGroup;<a name="line.3098"></a> +<span class="sourceLineNo">3099</span> currentNonce = nonce;<a name="line.3099"></a> +<span class="sourceLineNo">3100</span> }<a name="line.3100"></a> +<span class="sourceLineNo">3101</span><a name="line.3101"></a> +<span class="sourceLineNo">3102</span> // Add WAL edits by CP<a name="line.3102"></a> +<span class="sourceLineNo">3103</span> WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];<a name="line.3103"></a> +<span class="sourceLineNo">3104</span> if (fromCP != null) {<a name="line.3104"></a> +<span class="sourceLineNo">3105</span> for (Cell cell : fromCP.getCells()) {<a name="line.3105"></a> +<span class="sourceLineNo">3106</span> walEdit.add(cell);<a name="line.3106"></a> +<span class="sourceLineNo">3107</span> }<a name="line.3107"></a> +<span class="sourceLineNo">3108</span> }<a name="line.3108"></a> +<span class="sourceLineNo">3109</span> addFamilyMapToWALEdit(familyMaps[i], walEdit);<a name="line.3109"></a> +<span class="sourceLineNo">3110</span> }<a name="line.3110"></a> +<span class="sourceLineNo">3111</span><a name="line.3111"></a> +<span class="sourceLineNo">3112</span> // STEP 4. Append the final edit to WAL and sync.<a name="line.3112"></a> +<span class="sourceLineNo">3113</span> Mutation mutation = batchOp.getMutation(firstIndex);<a name="line.3113"></a> +<span class="sourceLineNo">3114</span> WALKey walKey = null;<a name="line.3114"></a> +<span class="sourceLineNo">3115</span> if (replay) {<a name="line.3115"></a> +<span class="sourceLineNo">3116</span> // use wal key from the original<a name="line.3116"></a> +<span class="sourceLineNo">3117</span> walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3117"></a> +<span class="sourceLineNo">3118</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3118"></a> +<span class="sourceLineNo">3119</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3119"></a> +<span class="sourceLineNo">3120</span> walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());<a name="line.3120"></a> +<span class="sourceLineNo">3121</span> }<a name="line.3121"></a> +<span class="sourceLineNo">3122</span> // Not sure what is going on here when replay is going on... does the below append get<a name="line.3122"></a> +<span class="sourceLineNo">3123</span> // called for replayed edits? Am afraid to change it without test.<a name="line.3123"></a> +<span class="sourceLineNo">3124</span> if (!walEdit.isEmpty()) {<a name="line.3124"></a> +<span class="sourceLineNo">3125</span> if (!replay) {<a name="line.3125"></a> +<span class="sourceLineNo">3126</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.3126"></a> +<span class="sourceLineNo">3127</span> walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3127"></a> +<span class="sourceLineNo">3128</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3128"></a> +<span class="sourceLineNo">3129</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3129"></a> +<span class="sourceLineNo">3130</span> }<a name="line.3130"></a> +<span class="sourceLineNo">3131</span> // TODO: Use the doAppend methods below... complicated by the replay stuff above.<a name="line.3131"></a> +<span class="sourceLineNo">3132</span> try {<a name="line.3132"></a> +<span class="sourceLineNo">3133</span> long txid =<a name="line.3133"></a> +<span class="sourceLineNo">3134</span> this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.3134"></a> +<span class="sourceLineNo">3135</span> if (txid != 0) sync(txid, durability);<a name="line.3135"></a> +<span class="sourceLineNo">3136</span> writeEntry = walKey.getWriteEntry();<a name="line.3136"></a> +<span class="sourceLineNo">3137</span> } catch (IOException ioe) {<a name="line.3137"></a> +<span class="sourceLineNo">3138</span> if (walKey != null) mvcc.complete(walKey.getWriteEntry());<a name="line.3138"></a> +<span class="sourceLineNo">3139</span> throw ioe;<a name="line.3139"></a> <span class="sourceLineNo">3140</span> }<a name="line.3140"></a> -<span class="sourceLineNo">3141</span> addedSize += applyFamilyMapToMemstore(familyMaps[i], replay,<a name="line.3141"></a> -<span class="sourceLineNo">3142</span> replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());<a name="line.3142"></a> -<span class="sourceLineNo">3143</span> }<a name="line.3143"></a> -<span class="sourceLineNo">3144</span><a name="line.3144"></a> -<span class="sourceLineNo">3145</span> // STEP 6. Complete mvcc.<a name="line.3145"></a> -<span class="sourceLineNo">3146</span> if (replay) {<a name="line.3146"></a> -<span class="sourceLineNo">3147</span> this.mvcc.advanceTo(batchOp.getReplaySequenceId());<a name="line.3147"></a> -<span class="sourceLineNo">3148</span> } else if (writeEntry != null/*Can be null if in replay mode*/) {<a name="line.3148"></a> -<span class="sourceLineNo">3149</span> mvcc.completeAndWait(writeEntry);<a name="line.3149"></a> -<span class="sourceLineNo">3150</span> writeEntry = null;<a name="line.3150"></a> -<span class="sourceLineNo">3151</span> }<a name="line.3151"></a> -<span class="sourceLineNo">3152</span><a name="line.3152"></a> -<span class="sourceLineNo">3153</span> // STEP 7. Release row locks, etc.<a name="line.3153"></a> -<span class="sourceLineNo">3154</span> if (locked) {<a name="line.3154"></a> -<span class="sourceLineNo">3155</span> this.updatesLock.readLock().unlock();<a name="line.3155"></a> -<span class="sourceLineNo">3156</span> locked = false;<a name="line.3156"></a> -<span class="sourceLineNo">3157</span> }<a name="line.3157"></a> -<span class="sourceLineNo">3158</span> releaseRowLocks(acquiredRowLocks);<a name="line.3158"></a> -<span class="sourceLineNo">3159</span><a name="line.3159"></a> -<span class="sourceLineNo">3160</span> // calling the post CP hook for batch mutation<a name="line.3160"></a> -<span class="sourceLineNo">3161</span> if (!replay && coprocessorHost != null) {<a name="line.3161"></a> -<span class="sourceLineNo">3162</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3162"></a> -<span class="sourceLineNo">3163</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3163"></a> -<span class="sourceLineNo">3164</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3164"></a> -<span class="sourceLineNo">3165</span> coprocessorHost.postBatchMutate(miniBatchOp);<a name="line.3165"></a> -<span class="sourceLineNo">3166</span> }<a name="line.3166"></a> -<span class="sourceLineNo">3167</span><a name="line.3167"></a> -<span class="sourceLineNo">3168</span> for (int i = firstIndex; i < lastIndexExclusive; i ++) {<a name="line.3168"></a> -<span class="sourceLineNo">3169</span> if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {<a name="line.3169"></a> -<span class="sourceLineNo">3170</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3170"></a> -<span class="sourceLineNo">3171</span> }<a name="line.3171"></a> -<span class="sourceLineNo">3172</span> }<a name="line.3172"></a> -<span class="sourceLineNo">3173</span><a name="line.3173"></a> -<span class="sourceLineNo">3174</span> // STEP 8. Run coprocessor post hooks. This should be done after the wal is<a name="line.3174"></a> -<span class="sourceLineNo">3175</span> // synced so that the coprocessor contract is adhered to.<a name="line.3175"></a> -<span class="sourceLineNo">3176</span> if (!replay && coprocessorHost != null) {<a name="line.3176"></a> -<span class="sourceLineNo">3177</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3177"></a> -<span class="sourceLineNo">3178</span> // only for successful puts<a name="line.3178"></a> -<span class="sourceLineNo">3179</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3179"></a> -<span class="sourceLineNo">3180</span> != OperationStatusCode.SUCCESS) {<a name="line.3180"></a> -<span class="sourceLineNo">3181</span> continue;<a name="line.3181"></a> -<span class="sourceLineNo">3182</span> }<a name="line.3182"></a> -<span class="sourceLineNo">3183</span> Mutation m = batchOp.getMutation(i);<a name="line.3183"></a> -<span class="sourceLineNo">3184</span> if (m instanceof Put) {<a name="line.3184"></a> -<span class="sourceLineNo">3185</span> coprocessorHost.postPut((Put) m, walEdit, m.getDurability());<a name="line.3185"></a> -<span class="sourceLineNo">3186</span> } else {<a name="line.3186"></a> -<span class="sourceLineNo">3187</span> coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());<a name="line.3187"></a> -<span class="sourceLineNo">3188</span> }<a name="line.3188"></a> -<span class="sourceLineNo">3189</span> }<a name="line.3189"></a> -<span class="sourceLineNo">3190</span> }<a name="line.3190"></a> -<span class="sourceLineNo">3191</span><a name="line.3191"></a> -<span class="sourceLineNo">3192</span> success = true;<a name="line.3192"></a> -<span class="sourceLineNo">3193</span> return addedSize;<a name="line.3193"></a> -<span class="sourceLineNo">3194</span> } finally {<a name="line.3194"></a> -<span class="sourceLineNo">3195</span> // Call complete rather than completeAndWait because we probably had error if walKey != null<a name="line.3195"></a> -<span class="sourceLineNo">3196</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.3196"></a> -<span class="sourceLineNo">3197</span> if (locked) {<a name="line.3197"></a> -<span class="sourceLineNo">3198</span> this.updatesLock.readLock().unlock();<a name="line.3198"></a> -<span class="sourceLineNo">3199</span> }<a name="line.3199"></a> -<span class="sourceLineNo">3200</span> releaseRowLocks(acquiredRowLocks);<a name="line.3200"></a> -<span class="sourceLineNo">3201</span><a name="line.3201"></a> -<span class="sourceLineNo">3202</span> // See if the column families were consistent through the whole thing.<a name="line.3202"></a> -<span class="sourceLineNo">3203</span> // if they were then keep them. If they were not then pass a null.<a name="line.3203"></a> -<span class="sourceLineNo">3204</span> // null will be treated as unknown.<a name="line.3204"></a> -<span class="sourceLineNo">3205</span> // Total time taken might be involving Puts and Deletes.<a name="line.3205"></a> -<span class="sourceLineNo">3206</span> // Split the time for puts and deletes based on the total number of Puts and Deletes.<a name="line.3206"></a> -<span class="sourceLineNo">3207</span><a name="line.3207"></a> -<span class="sourceLineNo">3208</span> if (noOfPuts > 0) {<a name="line.3208"></a> -<span class="sourceLineNo">3209</span> // There were some Puts in the batch.<a name="line.3209"></a> -<span class="sourceLineNo">3210</span> if (this.metricsRegion != null) {<a name="line.3210"></a> -<span class="sourceLineNo">3211</span> this.metricsRegion.updatePut();<a name="line.3211"></a> -<span class="sourceLineNo">3212</span> }<a name="line.3212"></a> -<span class="sourceLineNo">3213</span> }<a name="line.3213"></a> -<span class="sourceLineNo">3214</span> if (noOfDeletes > 0) {<a name="line.3214"></a> -<span class="sourceLineNo">3215</span> // There were some Deletes in the batch.<a name="line.3215"></a> -<span class="sourceLineNo">3216</span> if (this.metricsRegion != null) {<a name="line.3216"></a> -<span class="sourceLineNo">3217</span> this.metricsRegion.updateDelete();<a name="line.3217"></a> -<span class="sourceLineNo">3218</span> }<a name="line.3218"></a> -<span class="sourceLineNo">3219</span> }<a name="line.3219"></a> -<span class="sourceLineNo">3220</span> if (!success) {<a name="line.3220"></a> -<span class="sourceLineNo">3221</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3221"></a> -<span class="sourceLineNo">3222</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {<a name="line.3222"></a> -<span class="sourceLineNo">3223</span> batchOp.retCodeDetails[i] = OperationStatus.FAILURE;<a name="line.3223"></a> -<span class="sourceLineNo">3224</span> }<a name="line.3224"></a> -<span class="sourceLineNo">3225</span> }<a name="line.3225"></a> -<span class="sourceLineNo">3226</span> }<a name="line.3226"></a> -<span class="sourceLineNo">3227</span> if (coprocessorHost != null && !batchOp.isInReplay()) {<a name="line.3227"></a> -<span class="sourceLineNo">3228</span> // call the coprocessor hook to do any finalization steps<a name="line.3228"></a> -<span class="sourceLineNo">3229</span> // after the put is done<a name="line.3229"></a> -<span class="sourceLineNo">3230</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3230"></a> -<span class="sourceLineNo">3231</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3231"></a> -<span class="sourceLineNo">3232</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,<a name="line.3232"></a> -<span class="sourceLineNo">3233</span> lastIndexExclusive);<a name="line.3233"></a> -<span class="sourceLineNo">3234</span> coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);<a name="line.3234"></a> -<span class="sourceLineNo">3235</span> }<a name="line.3235"></a> -<span class="sourceLineNo">3236</span><a name="line.3236"></a> -<span class="sourceLineNo">3237</span> batchOp.nextIndexToProcess = lastIndexExclusive;<a name="line.3237"></a> -<span class="sourceLineNo">3238</span> }<a name="line.3238"></a> -<span class="sourceLineNo">3239</span> }<a name="line.3239"></a> -<span class="sourceLineNo">3240</span><a name="line.3240"></a> -<span class="sourceLineNo">3241</span> private void appendCurrentNonces(final Mutation mutation, final boolean replay,<a name="line.3241"></a> -<span class="sourceLineNo">3242</span> final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce)<a name="line.3242"></a> -<span class="sourceLineNo">3243</span> throws IOException {<a name="line.3243"></a> -<span class="sourceLineNo">3244</span> if (walEdit.isEmpty()) return;<a name="line.3244"></a> -<span class="sourceLineNo">3245</span> if (!replay) throw new IOException("Multiple nonces per batch and not in replay");<a name="line.3245"></a> -<span class="sourceLineNo">3246</span> WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3246"></a> -<span class="sourceLineNo">3247</span> this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),<a name="line.3247"></a> -<span class="sourceLineNo">3248</span> currentNonceGroup, currentNonce, mvcc);<a name="line.3248"></a> -<span class="sourceLineNo">3249</span> this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.3249"></a> -<span class="sourceLineNo">3250</span> // Complete the mvcc transaction started down in append else it will block others<a name="line.3250"></a> -<span class="sourceLineNo">3251</span> this.mvcc.complete(walKey.getWriteEntry());<a name="line.3251"></a> -<span class="sourceLineNo">3252</span> }<a name="line.3252"></a> -<span class="sourceLineNo">3253</span><a name="line.3253"></a> -<span class="sourceLineNo">3254</span> private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive,<a name="line.3254"></a> -<span class="sourceLineNo">3255</span> final Map<byte[], List<Cell>>[] familyMaps, final long now)<a name="line.3255"></a> -<span class="sourceLineNo">3256</span> throws IOException {<a name="line.3256"></a> -<span class="sourceLineNo">3257</span> boolean skip = false;<a name="line.3257"></a> -<span class="sourceLineNo">3258</span> // Skip anything that "ran" already<a name="line.3258"></a> -<span class="sourceLineNo">3259</span> if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()<a name="line.3259"></a> -<span class="sourceLineNo">3260</span> != OperationStatusCode.NOT_RUN) {<a name="line.3260"></a> -<span class="sourceLineNo">3261</span> return true;<a name="line.3261"></a> -<span class="sourceLineNo">3262</span> }<a name="line.3262"></a> -<span class="sourceLineNo">3263</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.3263"></a> -<span class="sourceLineNo">3264</span> Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();<a name="line.3264"></a> -<span class="sourceLineNo">3265</span> // store the family map reference to allow for mutations<a name="line.3265"></a> -<span class="sourceLineNo">3266</span> familyMaps[lastIndexExclusive] = familyMap;<a name="line.3266"></a> -<span class="sourceLineNo">3267</span><a name="line.3267"></a> -<span class="sourceLineNo">3268</span> try {<a name="line.3268"></a> -<span class="sourceLineNo">3269</span> if (mutation instanceof Put) {<a name="line.3269"></a> -<span class="sourceLineNo">3270</span> // Check the families in the put. If bad, skip this one.<a name="line.3270"></a> -<span class="sourceLineNo">3271</span> if (batchOp.isInReplay()) {<a name="line.3271"></a> -<span class="sourceLineNo">3272</span> removeNonExistentColumnFamilyForReplay(familyMap);<a name="line.3272"></a> -<span class="sourceLineNo">3273</span> } else {<a name="line.3273"></a> -<span class="sourceLineNo">3274</span> checkFamilies(familyMap.keySet());<a name="line.3274"></a> -<span class="sourceLineNo">3275</span> }<a name="line.3275"></a> -<span class="sourceLineNo">3276</span> checkTimestamps(mutation.getFamilyCellMap(), now);<a name="line.3276"></a> -<span class="sourceLineNo">3277</span> } else {<a name="line.3277"></a> -<span class="sourceLineNo">3278</span> prepareDelete((Delete)mutation);<a name="line.3278"></a> -<span class="sourceLineNo">3279</span> }<a name="line.3279"></a> -<span class="sourceLineNo">3280</span> checkRow(mutation.getRow(), "doMiniBatchMutation");<a name="line.3280"></a> -<span class="sourceLineNo">3281</span> } catch (NoSuchColumnFamilyException nscf) {<a name="line.3281"></a> -<span class="sourceLineNo">3282</span> LOG.warn("No such column family in batch mutation", nscf);<a name="line.3282"></a> -<span class="sourceLineNo">3283</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3283"></a> -<span class="sourceLineNo">3284</span> OperationStatusCode.BAD_FAMILY, nscf.getMessage());<a name="line.3284"></a> -<span class="sourceLineNo">3285</span> skip = true;<a name="line.3285"></a> -<span class="sourceLineNo">3286</span> } catch (FailedSanityCheckException fsce) {<a name="line.3286"></a> -<span class="sourceLineNo">3287</span> LOG.warn("Batch Mutation did not pass sanity check", fsce);<a name="line.3287"></a> -<span class="sourceLineNo">3288</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3288"></a> -<span class="sourceLineNo">3289</span> OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3289"></a> -<span class="sourceLineNo">3290</span> skip = true;<a name="line.3290"></a> -<span class="sourceLineNo">3291</span> } catch (WrongRegionException we) {<a name="line.3291"></a> -<span class="sourceLineNo">3292</span> LOG.warn("Batch mutation had a row that does not belong to this region", we);<a name="line.3292"></a> -<span class="sourceLineNo">3293</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3293"></a> -<span class="sourceLineNo">3294</span> OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3294"></a> -<span class="sourceLineNo">3295</span> skip = true;<a name="line.3295"></a> -<span class="sourceLineNo">3296</span> }<a name="line.3296"></a> -<span class="sourceLineNo">3297</span> return skip;<a name="line.3297"></a> -<span class="sourceLineNo">3298</span> }<a name="line.3298"></a> -<span class="sourceLineNo">3299</span><a name="line.3299"></a> -<span class="sourceLineNo">3300</span> /**<a name="line.3300"></a> -<span class="sourceLineNo">3301</span> * During replay, there could exist column families which are removed between region server<a name="line.3301"></a> -<span class="sourceLineNo">3302</span> * failure and replay<a name="line.3302"></a> -<span class="sourceLineNo">3303</span> */<a name="line.3303"></a> -<span class="sourceLineNo">3304</span> private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) {<a name="line.3304"></a> -<span class="sourceLineNo">3305</span> List<byte[]> nonExistentList = null;<a name="line.3305"></a> -<span class="sourceLineNo">3306</span> for (byte[] family : familyMap.keySet()) {<a name="line.3306"></a> -<span class="sourceLineNo">3307</span> if (!this.htableDescriptor.hasFamily(family)) {<a name="line.3307"></a> -<span class="sourceLineNo">3308</span> if (nonExistentList == null) {<a name="line.3308"></a> -<span class="sourceLineNo">3309</span> nonExistentList = new ArrayList<byte[]>();<a name="line.3309"></a> -<span class="sourceLineNo">3310</span> }<a name="line.3310"></a> -<span class="sourceLineNo">3311</span> nonExistentList.add(family);<a name="line.3311"></a> -<span class="sourceLineNo">3312</span> }<a name="line.3312"></a> -<span class="sourceLineNo">3313</span> }<a name="line.3313"></a> -<span class="sourceLineNo">3314</span> if (nonExistentList != null) {<a name="line.3314"></a> -<span class="sourceLineNo">3315</span> for (byte[] family : nonExistentList) {<a name="line.3315"></a> -<span class="sourceLineNo">3316</span> // Perhaps schema was changed between crash and replay<a name="line.3316"></a> -<span class="sourceLineNo">3317</span> LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");<a name="line.3317"></a> -<span class="sourceLineNo">3318</span> familyMap.remove(family);<a name="line.3318"></a> -<span class="sourceLineNo">3319</span> }<a name="line.3319"></a> -<span class="sourceLineNo">3320</span> }<a name="line.3320"></a> -<span class="sourceLineNo">3321</span> }<a name="line.3321"></a> -<span class="sourceLineNo">3322</span><a name="line.3322"></a> -<span class="sourceLineNo">3323</span> /**<a name="line.3323"></a> -<span class="sourceLineNo">3324</span> * Returns effective durability from the passed durability and<a name="line.3324"></a> -<span class="sourceLineNo">3325</span> * the table descriptor.<a name="line.3325"></a> -<span class="sourceLineNo">3326</span> */<a name="line.3326"></a> -<span class="sourceLineNo">3327</span> protected Durability getEffectiveDurability(Durability d) {<a name="line.3327"></a> -<span class="sourceLineNo">3328</span> return d == Durability.USE_DEFAULT ? this.durability : d;<a name="line.3328"></a> -<span class="sourceLineNo">3329</span> }<a name="line.3329"></a> -<span class="sourceLineNo">3330</span><a name="line.3330"></a> -<span class="sourceLineNo">3331</span> @Override<a name="line.3331"></a> -<span class="sourceLineNo">3332</span> public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,<a name="line.3332"></a> -<span class="sourceLineNo">3333</span> CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation,<a name="line.3333"></a> -<span class="sourceLineNo">3334</span> boolean writeToWAL)<a name="line.3334"></a> -<span class="sourceLineNo">3335</span> throws IOException{<a name="line.3335"></a> -<span class="sourceLineNo">3336</span> checkMutationType(mutation, row);<a name="line.3336"></a> -<span class="sourceLineNo">3337</span> return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, null,<a name="line.3337"></a> -<span class="sourceLineNo">3338</span> mutation, writeToWAL);<a name="line.3338"></a> -<span class="sourceLineNo">3339</span> }<a name="line.3339"></a> -<span class="sourceLineNo">3340</span><a name="line.3340"></a> -<span class="sourceLineNo">3341</span> @Override<a name="line.3341"></a> -<span class="sourceLineNo">3342</span> public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,<a name="line.3342"></a> -<span class="sourceLineNo">3343</span> CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,<a name="line.3343"></a> -<span class="sourceLineNo">3344</span> boolean writeToWAL)<a name="line.3344"></a> -<span class="sourceLineNo">3345</span> throws IOException {<a name="line.3345"></a> -<span class="sourceLineNo">3346</span> return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, rm, null,<a name="line.3346"></a> -<span class="sourceLineNo">3347</span> writeToWAL);<a name="line.3347"></a> -<span class="sourceLineNo">3348</span> }<a name="line.3348"></a> -<span class="sourceLineNo">3349</span><a name="line.3349"></a> -<span class="sourceLineNo">3350</span> /**<a name="line.3350"></a> -<span class="sourceLineNo">3351</span> * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has<a name="line.3351"></a> -<span class="sourceLineNo">3352</span> * switches in the few places where there is deviation.<a name="line.3352"></a> -<span class="sourceLineNo">3353</span> */<a name="line.3353"></a> -<span class="sourceLineNo">3354</span> private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier,<a name="line.3354"></a> -<span class="sourceLineNo">3355</span> CompareOp compareOp, ByteArrayComparable comparator, RowMutations rowMutations,<a name="line.3355"></a> -<span class="sourceLineNo">3356</span> Mutation mutation, boolean writeToWAL)<a name="line.3356"></a> +<span class="sourceLineNo">3141</span> }<a name="line.3141"></a> +<span class="sourceLineNo">3142</span> if (walKey == null) {<a name="line.3142"></a> +<span class="sourceLineNo">3143</span> // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid.<a name="line.3143"></a> +<span class="sourceLineNo">3144</span> writeEntry = mvcc.begin();<a name="line.3144"></a> +<span class="sourceLineNo">3145</span> }<a name="line.3145"></a> +<span class="sourceLineNo">3146</span><a name="line.3146"></a> +<span class="sourceLineNo">3147</span> // STEP 5. Write back to memstore<a name="line.3147"></a> +<span class="sourceLineNo">3148</span> long addedSize = 0;<a name="line.3148"></a> +<span class="sourceLineNo">3149</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3149"></a> +<span class="sourceLineNo">3150</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {<a name="line.3150"></a> +<span class="sourceLineNo">3151</span> continue;<a name="line.3151"></a> +<span class="sourceLineNo">3152</span> }<a name="line.3152"></a> +<span class="sourceLineNo">3153</span> addedSize += applyFamilyMapToMemstore(familyMaps[i], replay,<a name="line.3153"></a> +<span class="sourceLineNo">3154</span> replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());<a name="line.3154"></a> +<span class="sourceLineNo">3155</span> }<a name="line.3155"></a> +<span class="sourceLineNo">3156</span><a name="line.3156"></a> +<span class="sourceLineNo">3157</span> // STEP 6. Complete mvcc.<a name="line.3157"></a> +<span class="sourceLineNo">3158</span> if (replay) {<a name="line.3158"></a> +<span class="sourceLineNo">3159</span> this.mvcc.advanceTo(batchOp.getReplaySequenceId());<a name="line.3159"></a> +<span class="sourceLineNo">3160</span> } else if (writeEntry != null/*Can be null if in replay mode*/) {<a name="line.3160"></a> +<span class="sourceLineNo">3161</span> mvcc.completeAndWait(writeEntry);<a name="line.3161"></a> +<span class="sourceLineNo">3162</span> writeEntry = null;<a name="line.3162"></a> +<span class="sourceLineNo">3163</span> }<a name="line.3163"></a> +<span class="sourceLineNo">3164</span><a name="line.3164"></a> +<span class="sourceLineNo">3165</span> // STEP 7. Release row locks, etc.<a name="line.3165"></a> +<span class="sourceLineNo">3166</span> if (locked) {<a name="line.3166"></a> +<span class="sourceLineNo">3167</span> this.updatesLock.readLock().unlock();<a name="line.3167"></a> +<span class="sourceLineNo">3168</span> locked = false;<a name="line.3168"></a> +<span class="sourceLineNo">3169</span> }<a name="line.3169"></a> +<span class="sourceLineNo">3170</span> releaseRowLocks(acquiredRowLocks);<a name="line.3170"></a> +<span class="sourceLineNo">3171</span><a name="line.3171"></a> +<span class="sourceLineNo">3172</span> // calling the post CP hook for batch mutation<a name="line.3172"></a> +<span class="sourceLineNo">3173</span> if (!replay && coprocessorHost != null) {<a name="line.3173"></a> +<span class="sourceLineNo">3174</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3174"></a> +<span class="sourceLineNo">3175</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3175"></a> +<span class="sourceLineNo">3176</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3176"></a> +<span class="sourceLineNo">3177</span> coprocessorHost.postBatchMutate(miniBatchOp);<a name="line.3177"></a> +<span class="sourceLineNo">3178</span> }<a name="line.3178"></a> +<span class="sourceLineNo">3179</span><a name="line.3179"></a> +<span class="sourceLineNo">3180</span> for (int i = firstIndex; i < lastIndexExclusive; i ++) {<a name="line.3180"></a> +<span class="sourceLineNo">3181</span> if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {<a name="line.3181"></a> +<span class="sourceLineNo">3182</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3182"></a> +<span class="sourceLineNo">3183</span> }<a name="line.3183"></a> +<span class="sourceLineNo">3184</span> }<a name="line.3184"></a> +<span class="sourceLineNo">3185</span><a name="line.3185"></a> +<span class="sourceLineNo">3186</span> // STEP 8. Run coprocessor post hooks. This should be done after the wal is<a name="line.3186"></a> +<span class="sourceLineNo">3187</span> // synced so that the coprocessor contract is adhered to.<a name="line.3187"></a> +<span class="sourceLineNo">3188</span> if (!replay && coprocessorHost != null) {<a name="line.3188"></a> +<span class="sourceLineNo">3189</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3189"></a> +<span class="sourceLineNo">3190</span> // only for successful puts<a name="line.3190"></a> +<span class="sourceLineNo">3191</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3191"></a> +<span class="sourceLineNo">3192</span> != OperationStatusCode.SUCCESS) {<a name="line.3192"></a> +<span class="sourceLineNo">3193</span> continue;<a name="line.3193"></a> +<span class="sourceLineNo">3194</span> }<a name="line.3194"></a> +<span class="sourceLineNo">3195</span> Mutation m = batchOp.getMutation(i);<a name="line.3195"></a> +<span class="sourceLineNo">3196</span> if (m instanceof Put) {<a name="line.3196"></a> +<span class="sourceLineNo">3197</span> coprocessorHost.postPut((Put) m, walEdit, m.getDurability());<a name="line.3197"></a> +<span class="sourceLineNo">3198</span> } else {<a name="line.3198"></a> +<span class="sourceLineNo">3199</span> coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());<a name="line.3199"></a> +<span class="sourceLineNo">3200</span> }<a name="line.3200"></a> +<span class="sourceLineNo">3201</span> }<a name="line.3201"></a> +<span class="sourceLineNo">3202</span> }<a name="line.3202"></a> +<span class="sourceLineNo">3203</span><a name="line.3203"></a> +<span class="sourceLineNo">3204</span> success = true;<a name="line.3204"></a> +<span class="sourceLineNo">3205</span> return addedSize;<a name="line.3205"></a> +<span class="sourceLineNo">3206</span> } finally {<a name="line.3206"></a> +<span class="sourceLineNo">3207</span> // Call complete rather than completeAndWait because we probably had error if walKey != null<a name="line.3207"></a> +<span class="sourceLineNo">3208</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.3208"></a> +<span class="sourceLineNo">3209</span> if (locked) {<a name="line.3209"></a> +<span class="sourceLineNo">3210</span> this.updatesLock.readLock().unlock();<a name="line.3210"></a> +<span class="sourceLineNo">3211</span> }<a name="line.3211"></a> +<span class="sourceLineNo">3212</span> releaseRowLocks(acquiredRowLocks);<a name="line.3212"></a> +<span class="sourceLineNo">3213</span><a name="line.3213"></a> +<span class="sourceLineNo">3214</span> // See if the column families were consistent through the whole thing.<a name="line.3214"></a> +<span class="sourceLineNo">3215</span> // if they were then keep them. If they were not then pass a null.<a name="line.3215"></a> +<span class="sourceLineNo">3216</span> // null will be treated as unknown.<a name="line.3216"></a> +<span class="sourceLineNo">3217</span> // Total time taken might be involving Puts and Deletes.<a name="line.3217"></a> +<span class="sourceLineNo">3218</span> // Split the time for puts and deletes based on the total number of Puts and Deletes.<a name="line.3218"></a> +<span class="sourceLineNo">3219</span><a name="line.3219"></a> +<span class="sourceLineNo">3220</span> if (noOfPuts > 0) {<a name="line.3220"></a> +<span class="sourceLineNo">3221</span> // There were some Puts in the batch.<a name="line.3221"></a> +<span class="sourceLineNo">3222</span> if (this.metricsRegion != null) {<a name="line.3222"></a> +<span class="sourceLineNo">3223</span> this.metricsRegion.updatePut();<a name="line.3223"></a> +<span class="sourceLineNo">3224</span> }<a name="line.3224"></a> +<span class="sourceLineNo">3225</span> }<a name="line.3225"></a> +<span class="sourceLineNo">3226</span> if (noOfDeletes > 0) {<a name="line.3226"></a> +<span class="sourceLineNo">3227</span> // There were some Deletes in the batch.<a name="line.3227"></a> +<span class="sourceLineNo">3228</span> if (this.metricsRegion != null) {<a name="line.3228"></a> +<span class="sourceLineNo">3229</span> this.metricsRegion.updateDelete();<a name="line.3229"></a> +<span class="sourceLineNo">3230</span> }<a name="line.3230"></a> +<span class="sourceLineNo">3231</span> }<a name="line.3231"></a> +<span class="sourceLineNo">3232</span> if (!success) {<a name="line.3232"></a> +<span class="sourceLineNo">3233</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3233"></a> +<span class="sourceLineNo">3234</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {<a name="line.3234"></a> +<span class="sourceLineNo">3235</span> batchOp.retCodeDetails[i] = OperationStatus.FAILURE;<a name="line.3235"></a> +<span class="sourceLineNo">3236</span> }<a name="line.3236"></a> +<span class="sourceLineNo">3237</span> }<a name="line.3237"></a> +<span class="sourceLineNo">3238</span> }<a name="line.3238"></a> +<span class="sourceLineNo">3239</span> if (coprocessorHost != null && !batchOp.isInReplay()) {<a name="line.3239"></a> +<span class="sourceLineNo">3240</span> // call the coprocessor hook to do any finalization steps<a name="line.3240"></a> +<span class="sourceLineNo">3241</span> // after the put is done<a name="line.3241"></a> +<span class="sourceLineNo">3242</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3242"></a> +<span class="sourceLineNo">3243</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3243"></a> +<span class="sourceLineNo">3244</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,<a name="line.3244"></a> +<span class="sourceLineNo">3245</span> lastIndexExclusive);<a name="line.3245"></a> +<span class="sourceLineNo">3246</span> coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);<a name="line.3246"></a> +<span class="sourceLineNo">3247</span> }<a name="line.3247"></a> +<span class="sourceLineNo">3248</span><a name="line.3248"></a> +<span class="sourceLineNo">3249</span> batchOp.nextIndexToProcess = lastIndexExclusive;<a name="line.3249"></a> +<span class="sourceLineNo">3250</span> }<a name="line.3250"></a> +<span class="sourceLineNo">3251</span> }<a name="line.3251"></a> +<span class="sourceLineNo">3252</span><a name="line.3252"></a> +<span class="sourceLineNo">3253</span> private void appendCurrentNonces(final Mutation mutation, final boolean replay,<a name="line.3253"></a> +<span class="sourceLineNo">3254</span> final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce)<a name="line.3254"></a> +<span class="sourceLineNo">3255</span> throws IOException {<a name="line.3255"></a> +<span class="sourceLineNo">3256</span> if (walEdit.isEmpty()) return;<a name="line.3256"></a> +<span class="sourceLineNo">3257</span> if (!replay) throw new IOException("Multiple nonces per batch and not in replay");<a name="line.3257"></a> +<span class="sourceLineNo">3258</span> WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3258"></a> +<span class="sourceLineNo">3259</span> this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),<a name="line.3259"></a> +<span class="sourceLineNo">3260</span> currentNonceGroup, currentNonce, mvcc);<a name="line.3260"></a> +<span class="sourceLineNo">3261</span> this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.3261"></a> +<span class="sourceLineNo">3262</span> // Complete the mvcc transaction started down in append else it will block others<a name="line.3262"></a> +<span class="sourceLineNo">3263</span> this.mvcc.complete(walKey.getWriteEntry());<a name="line.3263"></a> +<span class="sourceLineNo">3264</span> }<a name="line.3264"></a> +<span class="sourceLineNo">3265</span><a name="line.3265"></a> +<span class="sourceLineNo">3266</span> private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive,<a name="line.3266"></a> +<span class="sourceLineNo">3267</span> final Map<byte[], List<Cell>>[] familyMaps, final long now)<a name="line.3267"></a> +<span class="sourceLineNo">3268</span> throws IOException {<a name="line.3268"></a> +<span class="sourceLineNo">3269</span> boolean skip = false;<a name="line.3269"></a> +<span class="sourceLineNo">3270</span> // Skip anything that "ran" already<a name="line.3270"></a> +<span class="sourceLineNo">3271</span> if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()<a name="line.3271"></a> +<span class="sourceLineNo">3272</span> != OperationStatusCode.NOT_RUN) {<a name="line.3272"></a> +<span class="sourceLineNo">3273</span> return true;<a name="line.3273"></a> +<span class="sourceLineNo">3274</span> }<a name="line.3274"></a> +<span class="sourceLineNo">3275</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.3275"></a> +<span class="sourceLineNo">3276</span> Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();<a name="line.3276"></a> +<span class="sourceLineNo">3277</span> // store the family map reference to allow for mutations<a name="line.3277"></a> +<span class="sourceLineNo">3278</span> familyMaps[lastIndexExclusive] = familyMap;<a name="line.3278"></a> +<span class="sourceLineNo">3279</span><a name="line.3279"></a> +<span class="sourceLineNo">3280</span> try {<a name="line.3280"></a> +<span class="sourceLineNo">3281</span> if (mutation instanceof Put) {<a name="line.3281"></a> +<span class="sourceLineNo">3282</span> // Check the families in the put. If bad, skip this one.<a name="line.3282"></a> +<span class="sourceLineNo">3283</span> if (batchOp.isInReplay()) {<a name="line.3283"></a> +<span class="sourceLineNo">3284</span> removeNonExistentColumnFamilyForReplay(familyMap);<a name="line.3284"></a> +<span class="sourceLineNo">3285</span> } else {<a name="line.3285"></a> +<span class="sourceLineNo">3286</span> checkFamilies(familyMap.keySet());<a name="line.3286"></a> +<span class="sourceLineNo">3287</span> }<a name="line.3287"></a> +<span class="sourceLineNo">3288</span> checkTimestamps(mutation.getFamilyCellMap(), now);<a name="line.3288"></a> +<span class="sourceLineNo">3289</span> } else {<a name="line.3289"></a> +<span class="sourceLineNo">3290</span> prepareDelete((Delete)mutation);<a name="line.3290"></a> +<span class="sourceLineNo">3291</span> }<a name="line.3291"></a> +<span class="sourceLineNo">3292</span> checkRow(mutation.getRow(), "doMiniBatchMutation");<a name="line.3292"></a> +<span class="sourceLineNo">3293</span> } catch (NoSuchColumnFamilyException nscf) {<a name="line.3293"></a> +<span class="sourceLineNo">3294</span> LOG.warn("No such column family in batch mutation", nscf);<a name="line.3294"></a> +<span class="sourceLineNo">3295</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3295"></a> +<span class="sourceLineNo">3296</span> OperationStatusCode.BAD_FAMILY, nscf.getMessage());<a name="line.3296"></a> +<span class="sourceLineNo">3297</span> skip = true;<a name="line.3297"></a> +<span class="sourceLineNo">3298</span> } catch (FailedSanityCheckException fsce) {<a name="line.3298"></a> +<span class="sourceLineNo">3299</span> LOG.warn("Batch Mutation did not pass sanity check", fsce);<a name="line.3299"></a> +<span class="sourceLineNo">3300</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3300"></a> +<span class="sourceLineNo">3301</span> OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3301"></a> +<span class="sourceLineNo">3302</span> skip = true;<a name="line.3302"></a> +<span class="sourceLineNo">3303</span> } catch (WrongRegionException we) {<a name="line.3303"></a> +<span class="sourceLineNo">3304</span> LOG.warn("Batch mutation had a row that does not belong to this region", we);<a name="line.3304"></a> +<span class="sourceLineNo">3305</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3305"></a> +<span class="sourceLineNo">3306</span> OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3306"></a> +<span class="sourceLineNo">3307</span> skip = true;<a name="line.3307"></a> +<span class="sourceLineNo">3308</span> }<a name="line.3308"></a> +<span class="sourceLineNo">3309</span> return skip;<a name="line.3309"></a> +<span class="sourceLineNo">3310</span> }<a name="line.3310"></a> +<span class="sourceLineNo">3311</span><a name="line.3311"></a> +<span class="sourceLineNo">3312</span> /**<a name="line.3312"></a> +<span class="sourceLineNo">3313</span> * During replay, there could exist column families which are removed between region server<a name="line.3313"></a> +<span class="sourceLineNo">3314</span> * failure and replay<a name="line.3314"></a> +<span class="sourceLineNo">3315</span> */<a name="line.3315"></a> +<span class="sourceLineNo">3316</span> private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) {<a name="line.3316"></a> +<span class="sourceLineNo">3317</span> List<byte[]> nonExistentList = null;<a name="line.3317"></a> +<span class="sourceLineNo">3318</span> for (byte[] family : familyMap.keySet()) {<a name="line.3318"></a> +<span class="sourceLineNo">3319</span> if (!this.htableDescriptor.hasFamily(family)) {<a name="line.3319"></a> +<span class="sourceLineNo">3320</span> if (nonExistentList == null) {<a name="line.3320"></a> +<span class="sourceLineNo">3321</span> nonExistentList = new ArrayList<byte[]>();<a name="line.3321"></a> +<span class="sourceLineNo">3322</span> }<a name="line.3322"></a> +<span class="sourceLineNo">3323</span> nonExistentList.add(family);<a name="line.3323"></a> +<span class="sourceLineNo">3324</span> }<a name="line.3324"></a> +<span class="sourceLineNo">3325</span> }<a name="line.3325"></a> +<span class="sourceLineNo">3326</span> if (nonExistentList != null) {<a name="line.3326"></a> +<span class="sourceLineNo">3327</span> for (byte[] family : nonExistentList) {<a name="line.3327"></a> +<span class="sourceLineNo">3328</span> // Perhaps schema was changed between crash and replay<a name="line.3328"></a> +<span class="sourceLineNo">3329</span> LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");<a name="line.3329"></a> +<span class="sourceLineNo">3330</span> familyMap.remove(family);<a name="line.3330"></a> +<span class="sourceLineNo">3331</span> }<a name="line.3331"></a> +<span class="sourceLineNo">3332</span> }<a name="line.3332"></a> +<span class="sourceLineNo">3333</span> }<a name="line.3333"></a> +<span class="sourceLineNo">3334</span><a name="line.3334"></a> +<span class="sourceLineNo">3335</span> /**<a name="line.3335"></a> +<span class="sourceLineNo">3336</span> * Returns effective durability from the passed durability and<a name="line.3336"></a> +<span class="sourceLineNo">3337</span> * the table descriptor.<a name="line.3337"></a> +<span class="sourceLineNo">3338</span> */<a name="line.3338"></a> +<span class="sourceLineNo">3339</span> protected Durability getEffectiveDurability(Durability d) {<a name="line.3339"></a> +<span class="sourceLineNo">3340</span> return d == Durability.USE_DEFAULT ? this.durability : d;<a name="line.3340"></a> +<span class="sourceLineNo">3341</span> }<a name="line.3341"></a> +<span class="sourceLineNo">3342</span><a name="line.3342"></a> +<span class="sourceLineNo">3343</span> @Override<a name="line.3343"></a> +<span class="sourceLineNo">3344</span> public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,<a name="line.3344"></a> +<span class="sourceLineNo">3345</span> CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation,<a name="line.3345"></a> +<span class="sourceLineNo">3346</span> boolean writeToWAL)<a name="line.3346"></a> +<span class="sourceLineNo">3347</span> throws IOException{<a name="line.3347"></a> +<span class="sourceLineNo">3348</span> checkMutationType(mutation, row);<a name="line.3348"></a> +<span class="sourceLineNo">3349</span> return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, null,<a name="line.3349"></a> +<span class="sourceLineNo">3350</span> mutation, writeToWAL);<a name="line.3350"></a> +<span class="sourceLineNo">3351</span> }<a name="line.3351"></a> +<span class="sourceLineNo">3352</span><a name="line.3352"></a> +<span class="sourceLineNo">3353</span> @Override<a name="line.3353"></a> +<span class="sourceLineNo">3354</span> public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,<a name="line.3354"></a> +<span class="sourceLineNo">3355</span> CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,<a name="line.3355"></a> +<span class="sourceLineNo">3356</span> boolean writeToWAL)<a name="line.3356"></a> <span class="sourceLineNo">3357</span> throws IOException {<a name="line.3357"></a> -<span class="sourceLineNo">3358</span> // Could do the below checks but seems wacky with two callers only. Just comment out for now.<a name="line.3358"></a> -<span class="sourceLineNo">3359</span> // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't<a name="line.3359"></a> -<span class="sourceLineNo">3360</span> // need these commented out checks.<a name="line.3360"></a> -<span class="sourceLineNo">3361</span> // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null");<a name="line.3361"></a> -<span class="sourceLineNo">3362</span> // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set");<a name="line.3362"></a> -<span class="sourceLineNo">3363</span> checkReadOnly();<a name="line.3363"></a> -<span class="sourceLineNo">3364</span> // TODO, add check for value length also move this check to the client<a name="line.3364"></a> -<span class="sourceLineNo">3365</span> checkResources();<a name="line.3365"></a> -<span class="sourceLineNo">3366</span> startRegionOperation();<a name="line.3366"></a> -<span class="sourceLineNo">3367</span> try {<a name="line.3367"></a> -<span class="sourceLineNo">3368</span> Get get = new Get(row);<a name="line.3368"></a> -<span class="sourceLineNo">3369</span> checkFamily(family);<a name="line.3369"></a> -<span class="sourceLineNo">3370</span> get.addColumn(family, qualifier);<a name="line.3370"></a> -<span class="sourceLineNo">3371</span> // Lock row - note that doBatchMutate will relock this row if called<a name="line.3371"></a> -<span class="sourceLineNo">3372</span> RowLock rowLock = getRowLock(get.getRow());<a name="line.3372"></a> -<span class="sourceLineNo">3373</span> try {<a name="line.3373"></a> -<span class="sourceLineNo">3374</span> if (mutation != null && this.getCoprocessorHost() != null) {<a name="line.3374"></a> -<span class="sourceLineNo">3375</span> // Call coprocessor.<a name="line.3375"></a> -<span class="sourceLineNo">3376</span> Boolean processed = null;<a name="line.3376"></a> -<span class="sourceLineNo">3377</span> if (mutation instanceof Put) {<a name="line.3377"></a> -<span class="sourceLineNo">3378</span> processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,<a name="line.3378"></a> -<span class="sourceLineNo">3379</span> qualifier, compareOp, comparator, (Put)mutation);<a name="line.3379"></a> -<span class="sourceLineNo">3380</span> } else if (mutation instanceof Delete) {<a name="line.3380"></a> -<span class="sourceLineNo">3381</span> processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,<a name="line.3381"></a> -<span class="sourceLineNo">3382</span> qualifier, compareOp, comparator, (Delete)mutation);<a name="line.3382"></a> -<span class="sourceLineNo">3383</span> }<a name="line.3383"></a> -<span class="sourceLineNo">3384</span> if (processed != null) {<a name="line.3384"></a> -<span class="sourceLineNo">3385</span> return processed;<a name="line.3385"></a> -<span class="sourceLineNo">3386</span> }<a name="line.3386"></a> -<span class="sourceLineNo">3387</span> }<a name="line.3387"></a> -<span class="sourceLineNo">3388</span> // NOTE: We used to wait here until mvcc caught up: mvcc.await();<a name="line.3388"></a> -<span class="sourceLineNo">3389</span> // Supposition is that now all changes are done under row locks, then when we go to read,<a name="line.3389"></a> -<span class="sourceLineNo">3390</span> // we'll get the latest on this row.<a name="line.3390"></a> -<span class="sourceLineNo">3391</span> List<Cell> result = get(get, false);<a name="line.3391"></a> -<span class="sourceLineNo">3392</span> boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;<a name="line.3392"></a> -<span class="sourceLineNo">3393</span> boolean matches = false;<a name="line.3393"></a> -<span class="sourceLineNo">3394</span> long cellTs = 0;<a name="line.3394"></a> -<span class="sourceLineNo">3395</span> if (result.size() == 0 && valueIsNull) {<a name="line.3395"></a> -<span class="sourceLineNo">3396</span> matches = true;<a name="line.3396"></a> -<span class="sourceLineNo">3397</span> } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {<a name="line.3397"></a> -<span class="sourceLineNo">3398</span> matches = true;<a name="line.3398"></a> -<span class="sourceLineNo">3399</span> cellTs = result.get(0).getTimestamp();<a name="line.3399"></a> -<span class="sourceLineNo">3400</span> } else if (result.size() == 1 && !valueIsNull) {<a name="line.3400"></a> -<span class="sourceLineNo">3401</span> Cell kv = result.get(0);<a name="line.3401"></a> -<span class="sourceLineNo">3402</span> cellTs = kv.getTimestamp();<a name="line.3402"></a> -<span class="sourceLineNo">3403</span> int compareResult = CellComparator.compareValue(kv, comparator);<a name="line.3403"></a> -<span class="sourceLineNo">3404</span> matches = matches(compareOp, compareResult);<a name="line.3404"></a> -<span class="sourceLineNo">3405</span> }<a name="line.3405"></a> -<span class="sourceLineNo">3406</span> // If matches put the new put or delete the new delete<a name="line.3406"></a> -<span class="sourceLineNo">3407</span> if (matches) {<a name="line.3407"></a> -<span class="sourceLineNo">3408</span> // We have acquired the row lock alrea
<TRUNCATED>