http://git-wip-us.apache.org/repos/asf/hbase-site/blob/9eba7fcf/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html index 504e470..38667c0 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html @@ -2866,5375 +2866,5371 @@ <span class="sourceLineNo">2858</span> checkResources();<a name="line.2858"></a> <span class="sourceLineNo">2859</span> startRegionOperation(Operation.DELETE);<a name="line.2859"></a> <span class="sourceLineNo">2860</span> try {<a name="line.2860"></a> -<span class="sourceLineNo">2861</span> delete.getRow();<a name="line.2861"></a> -<span class="sourceLineNo">2862</span> // All edits for the given row (across all column families) must happen atomically.<a name="line.2862"></a> -<span class="sourceLineNo">2863</span> doBatchMutate(delete);<a name="line.2863"></a> -<span class="sourceLineNo">2864</span> } finally {<a name="line.2864"></a> -<span class="sourceLineNo">2865</span> closeRegionOperation(Operation.DELETE);<a name="line.2865"></a> -<span class="sourceLineNo">2866</span> }<a name="line.2866"></a> -<span class="sourceLineNo">2867</span> }<a name="line.2867"></a> -<span class="sourceLineNo">2868</span><a name="line.2868"></a> -<span class="sourceLineNo">2869</span> /**<a name="line.2869"></a> -<span class="sourceLineNo">2870</span> * Row needed by below method.<a name="line.2870"></a> -<span class="sourceLineNo">2871</span> */<a name="line.2871"></a> -<span class="sourceLineNo">2872</span> private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");<a name="line.2872"></a> -<span class="sourceLineNo">2873</span><a name="line.2873"></a> -<span class="sourceLineNo">2874</span> /**<a name="line.2874"></a> -<span class="sourceLineNo">2875</span> * This is used only by unit tests. Not required to be a public API.<a name="line.2875"></a> -<span class="sourceLineNo">2876</span> * @param familyMap map of family to edits for the given family.<a name="line.2876"></a> -<span class="sourceLineNo">2877</span> * @throws IOException<a name="line.2877"></a> -<span class="sourceLineNo">2878</span> */<a name="line.2878"></a> -<span class="sourceLineNo">2879</span> void delete(NavigableMap<byte[], List<Cell>> familyMap,<a name="line.2879"></a> -<span class="sourceLineNo">2880</span> Durability durability) throws IOException {<a name="line.2880"></a> -<span class="sourceLineNo">2881</span> Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);<a name="line.2881"></a> -<span class="sourceLineNo">2882</span> delete.setFamilyCellMap(familyMap);<a name="line.2882"></a> -<span class="sourceLineNo">2883</span> delete.setDurability(durability);<a name="line.2883"></a> -<span class="sourceLineNo">2884</span> doBatchMutate(delete);<a name="line.2884"></a> -<span class="sourceLineNo">2885</span> }<a name="line.2885"></a> -<span class="sourceLineNo">2886</span><a name="line.2886"></a> -<span class="sourceLineNo">2887</span> @Override<a name="line.2887"></a> -<span class="sourceLineNo">2888</span> public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,<a name="line.2888"></a> -<span class="sourceLineNo">2889</span> byte[] byteNow) throws IOException {<a name="line.2889"></a> -<span class="sourceLineNo">2890</span> for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {<a name="line.2890"></a> -<span class="sourceLineNo">2891</span><a name="line.2891"></a> -<span class="sourceLineNo">2892</span> byte[] family = e.getKey();<a name="line.2892"></a> -<span class="sourceLineNo">2893</span> List<Cell> cells = e.getValue();<a name="line.2893"></a> -<span class="sourceLineNo">2894</span> assert cells instanceof RandomAccess;<a name="line.2894"></a> -<span class="sourceLineNo">2895</span><a name="line.2895"></a> -<span class="sourceLineNo">2896</span> Map<byte[], Integer> kvCount = new TreeMap<>(Bytes.BYTES_COMPARATOR);<a name="line.2896"></a> -<span class="sourceLineNo">2897</span> int listSize = cells.size();<a name="line.2897"></a> -<span class="sourceLineNo">2898</span> for (int i=0; i < listSize; i++) {<a name="line.2898"></a> -<span class="sourceLineNo">2899</span> Cell cell = cells.get(i);<a name="line.2899"></a> -<span class="sourceLineNo">2900</span> // Check if time is LATEST, change to time of most recent addition if so<a name="line.2900"></a> -<span class="sourceLineNo">2901</span> // This is expensive.<a name="line.2901"></a> -<span class="sourceLineNo">2902</span> if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {<a name="line.2902"></a> -<span class="sourceLineNo">2903</span> byte[] qual = CellUtil.cloneQualifier(cell);<a name="line.2903"></a> -<span class="sourceLineNo">2904</span> if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;<a name="line.2904"></a> -<span class="sourceLineNo">2905</span><a name="line.2905"></a> -<span class="sourceLineNo">2906</span> Integer count = kvCount.get(qual);<a name="line.2906"></a> -<span class="sourceLineNo">2907</span> if (count == null) {<a name="line.2907"></a> -<span class="sourceLineNo">2908</span> kvCount.put(qual, 1);<a name="line.2908"></a> -<span class="sourceLineNo">2909</span> } else {<a name="line.2909"></a> -<span class="sourceLineNo">2910</span> kvCount.put(qual, count + 1);<a name="line.2910"></a> -<span class="sourceLineNo">2911</span> }<a name="line.2911"></a> -<span class="sourceLineNo">2912</span> count = kvCount.get(qual);<a name="line.2912"></a> -<span class="sourceLineNo">2913</span><a name="line.2913"></a> -<span class="sourceLineNo">2914</span> Get get = new Get(CellUtil.cloneRow(cell));<a name="line.2914"></a> -<span class="sourceLineNo">2915</span> get.setMaxVersions(count);<a name="line.2915"></a> -<span class="sourceLineNo">2916</span> get.addColumn(family, qual);<a name="line.2916"></a> -<span class="sourceLineNo">2917</span> if (coprocessorHost != null) {<a name="line.2917"></a> -<span class="sourceLineNo">2918</span> if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,<a name="line.2918"></a> -<span class="sourceLineNo">2919</span> byteNow, get)) {<a name="line.2919"></a> -<span class="sourceLineNo">2920</span> updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);<a name="line.2920"></a> -<span class="sourceLineNo">2921</span> }<a name="line.2921"></a> -<span class="sourceLineNo">2922</span> } else {<a name="line.2922"></a> -<span class="sourceLineNo">2923</span> updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);<a name="line.2923"></a> -<span class="sourceLineNo">2924</span> }<a name="line.2924"></a> -<span class="sourceLineNo">2925</span> } else {<a name="line.2925"></a> -<span class="sourceLineNo">2926</span> CellUtil.updateLatestStamp(cell, byteNow, 0);<a name="line.2926"></a> -<span class="sourceLineNo">2927</span> }<a name="line.2927"></a> -<span class="sourceLineNo">2928</span> }<a name="line.2928"></a> -<span class="sourceLineNo">2929</span> }<a name="line.2929"></a> -<span class="sourceLineNo">2930</span> }<a name="line.2930"></a> -<span class="sourceLineNo">2931</span><a name="line.2931"></a> -<span class="sourceLineNo">2932</span> void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)<a name="line.2932"></a> -<span class="sourceLineNo">2933</span> throws IOException {<a name="line.2933"></a> -<span class="sourceLineNo">2934</span> List<Cell> result = get(get, false);<a name="line.2934"></a> -<span class="sourceLineNo">2935</span><a name="line.2935"></a> -<span class="sourceLineNo">2936</span> if (result.size() < count) {<a name="line.2936"></a> -<span class="sourceLineNo">2937</span> // Nothing to delete<a name="line.2937"></a> -<span class="sourceLineNo">2938</span> CellUtil.updateLatestStamp(cell, byteNow, 0);<a name="line.2938"></a> -<span class="sourceLineNo">2939</span> return;<a name="line.2939"></a> -<span class="sourceLineNo">2940</span> }<a name="line.2940"></a> -<span class="sourceLineNo">2941</span> if (result.size() > count) {<a name="line.2941"></a> -<span class="sourceLineNo">2942</span> throw new RuntimeException("Unexpected size: " + result.size());<a name="line.2942"></a> -<span class="sourceLineNo">2943</span> }<a name="line.2943"></a> -<span class="sourceLineNo">2944</span> Cell getCell = result.get(count - 1);<a name="line.2944"></a> -<span class="sourceLineNo">2945</span> CellUtil.setTimestamp(cell, getCell.getTimestamp());<a name="line.2945"></a> -<span class="sourceLineNo">2946</span> }<a name="line.2946"></a> -<span class="sourceLineNo">2947</span><a name="line.2947"></a> -<span class="sourceLineNo">2948</span> @Override<a name="line.2948"></a> -<span class="sourceLineNo">2949</span> public void put(Put put) throws IOException {<a name="line.2949"></a> -<span class="sourceLineNo">2950</span> checkReadOnly();<a name="line.2950"></a> -<span class="sourceLineNo">2951</span><a name="line.2951"></a> -<span class="sourceLineNo">2952</span> // Do a rough check that we have resources to accept a write. The check is<a name="line.2952"></a> -<span class="sourceLineNo">2953</span> // 'rough' in that between the resource check and the call to obtain a<a name="line.2953"></a> -<span class="sourceLineNo">2954</span> // read lock, resources may run out. For now, the thought is that this<a name="line.2954"></a> -<span class="sourceLineNo">2955</span> // will be extremely rare; we'll deal with it when it happens.<a name="line.2955"></a> -<span class="sourceLineNo">2956</span> checkResources();<a name="line.2956"></a> -<span class="sourceLineNo">2957</span> startRegionOperation(Operation.PUT);<a name="line.2957"></a> -<span class="sourceLineNo">2958</span> try {<a name="line.2958"></a> -<span class="sourceLineNo">2959</span> // All edits for the given row (across all column families) must happen atomically.<a name="line.2959"></a> -<span class="sourceLineNo">2960</span> doBatchMutate(put);<a name="line.2960"></a> -<span class="sourceLineNo">2961</span> } finally {<a name="line.2961"></a> -<span class="sourceLineNo">2962</span> closeRegionOperation(Operation.PUT);<a name="line.2962"></a> -<span class="sourceLineNo">2963</span> }<a name="line.2963"></a> -<span class="sourceLineNo">2964</span> }<a name="line.2964"></a> -<span class="sourceLineNo">2965</span><a name="line.2965"></a> -<span class="sourceLineNo">2966</span> /**<a name="line.2966"></a> -<span class="sourceLineNo">2967</span> * Struct-like class that tracks the progress of a batch operation,<a name="line.2967"></a> -<span class="sourceLineNo">2968</span> * accumulating status codes and tracking the index at which processing<a name="line.2968"></a> -<span class="sourceLineNo">2969</span> * is proceeding.<a name="line.2969"></a> -<span class="sourceLineNo">2970</span> */<a name="line.2970"></a> -<span class="sourceLineNo">2971</span> private abstract static class BatchOperation<T> {<a name="line.2971"></a> -<span class="sourceLineNo">2972</span> T[] operations;<a name="line.2972"></a> -<span class="sourceLineNo">2973</span> int nextIndexToProcess = 0;<a name="line.2973"></a> -<span class="sourceLineNo">2974</span> OperationStatus[] retCodeDetails;<a name="line.2974"></a> -<span class="sourceLineNo">2975</span> WALEdit[] walEditsFromCoprocessors;<a name="line.2975"></a> -<span class="sourceLineNo">2976</span><a name="line.2976"></a> -<span class="sourceLineNo">2977</span> public BatchOperation(T[] operations) {<a name="line.2977"></a> -<span class="sourceLineNo">2978</span> this.operations = operations;<a name="line.2978"></a> -<span class="sourceLineNo">2979</span> this.retCodeDetails = new OperationStatus[operations.length];<a name="line.2979"></a> -<span class="sourceLineNo">2980</span> this.walEditsFromCoprocessors = new WALEdit[operations.length];<a name="line.2980"></a> -<span class="sourceLineNo">2981</span> Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);<a name="line.2981"></a> -<span class="sourceLineNo">2982</span> }<a name="line.2982"></a> -<span class="sourceLineNo">2983</span><a name="line.2983"></a> -<span class="sourceLineNo">2984</span> public abstract Mutation getMutation(int index);<a name="line.2984"></a> -<span class="sourceLineNo">2985</span> public abstract long getNonceGroup(int index);<a name="line.2985"></a> -<span class="sourceLineNo">2986</span> public abstract long getNonce(int index);<a name="line.2986"></a> -<span class="sourceLineNo">2987</span> /** This method is potentially expensive and should only be used for non-replay CP path. */<a name="line.2987"></a> -<span class="sourceLineNo">2988</span> public abstract Mutation[] getMutationsForCoprocs();<a name="line.2988"></a> -<span class="sourceLineNo">2989</span> public abstract boolean isInReplay();<a name="line.2989"></a> -<span class="sourceLineNo">2990</span> public abstract long getReplaySequenceId();<a name="line.2990"></a> -<span class="sourceLineNo">2991</span><a name="line.2991"></a> -<span class="sourceLineNo">2992</span> public boolean isDone() {<a name="line.2992"></a> -<span class="sourceLineNo">2993</span> return nextIndexToProcess == operations.length;<a name="line.2993"></a> -<span class="sourceLineNo">2994</span> }<a name="line.2994"></a> -<span class="sourceLineNo">2995</span> }<a name="line.2995"></a> -<span class="sourceLineNo">2996</span><a name="line.2996"></a> -<span class="sourceLineNo">2997</span> private static class MutationBatch extends BatchOperation<Mutation> {<a name="line.2997"></a> -<span class="sourceLineNo">2998</span> private long nonceGroup;<a name="line.2998"></a> -<span class="sourceLineNo">2999</span> private long nonce;<a name="line.2999"></a> -<span class="sourceLineNo">3000</span> public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {<a name="line.3000"></a> -<span class="sourceLineNo">3001</span> super(operations);<a name="line.3001"></a> -<span class="sourceLineNo">3002</span> this.nonceGroup = nonceGroup;<a name="line.3002"></a> -<span class="sourceLineNo">3003</span> this.nonce = nonce;<a name="line.3003"></a> -<span class="sourceLineNo">3004</span> }<a name="line.3004"></a> -<span class="sourceLineNo">3005</span><a name="line.3005"></a> -<span class="sourceLineNo">3006</span> @Override<a name="line.3006"></a> -<span class="sourceLineNo">3007</span> public Mutation getMutation(int index) {<a name="line.3007"></a> -<span class="sourceLineNo">3008</span> return this.operations[index];<a name="line.3008"></a> -<span class="sourceLineNo">3009</span> }<a name="line.3009"></a> -<span class="sourceLineNo">3010</span><a name="line.3010"></a> -<span class="sourceLineNo">3011</span> @Override<a name="line.3011"></a> -<span class="sourceLineNo">3012</span> public long getNonceGroup(int index) {<a name="line.3012"></a> -<span class="sourceLineNo">3013</span> return nonceGroup;<a name="line.3013"></a> -<span class="sourceLineNo">3014</span> }<a name="line.3014"></a> -<span class="sourceLineNo">3015</span><a name="line.3015"></a> -<span class="sourceLineNo">3016</span> @Override<a name="line.3016"></a> -<span class="sourceLineNo">3017</span> public long getNonce(int index) {<a name="line.3017"></a> -<span class="sourceLineNo">3018</span> return nonce;<a name="line.3018"></a> -<span class="sourceLineNo">3019</span> }<a name="line.3019"></a> -<span class="sourceLineNo">3020</span><a name="line.3020"></a> -<span class="sourceLineNo">3021</span> @Override<a name="line.3021"></a> -<span class="sourceLineNo">3022</span> public Mutation[] getMutationsForCoprocs() {<a name="line.3022"></a> -<span class="sourceLineNo">3023</span> return this.operations;<a name="line.3023"></a> -<span class="sourceLineNo">3024</span> }<a name="line.3024"></a> -<span class="sourceLineNo">3025</span><a name="line.3025"></a> -<span class="sourceLineNo">3026</span> @Override<a name="line.3026"></a> -<span class="sourceLineNo">3027</span> public boolean isInReplay() {<a name="line.3027"></a> -<span class="sourceLineNo">3028</span> return false;<a name="line.3028"></a> -<span class="sourceLineNo">3029</span> }<a name="line.3029"></a> -<span class="sourceLineNo">3030</span><a name="line.3030"></a> -<span class="sourceLineNo">3031</span> @Override<a name="line.3031"></a> -<span class="sourceLineNo">3032</span> public long getReplaySequenceId() {<a name="line.3032"></a> -<span class="sourceLineNo">3033</span> return 0;<a name="line.3033"></a> -<span class="sourceLineNo">3034</span> }<a name="line.3034"></a> -<span class="sourceLineNo">3035</span> }<a name="line.3035"></a> -<span class="sourceLineNo">3036</span><a name="line.3036"></a> -<span class="sourceLineNo">3037</span> private static class ReplayBatch extends BatchOperation<MutationReplay> {<a name="line.3037"></a> -<span class="sourceLineNo">3038</span> private long replaySeqId = 0;<a name="line.3038"></a> -<span class="sourceLineNo">3039</span> public ReplayBatch(MutationReplay[] operations, long seqId) {<a name="line.3039"></a> -<span class="sourceLineNo">3040</span> super(operations);<a name="line.3040"></a> -<span class="sourceLineNo">3041</span> this.replaySeqId = seqId;<a name="line.3041"></a> -<span class="sourceLineNo">3042</span> }<a name="line.3042"></a> -<span class="sourceLineNo">3043</span><a name="line.3043"></a> -<span class="sourceLineNo">3044</span> @Override<a name="line.3044"></a> -<span class="sourceLineNo">3045</span> public Mutation getMutation(int index) {<a name="line.3045"></a> -<span class="sourceLineNo">3046</span> return this.operations[index].mutation;<a name="line.3046"></a> -<span class="sourceLineNo">3047</span> }<a name="line.3047"></a> -<span class="sourceLineNo">3048</span><a name="line.3048"></a> -<span class="sourceLineNo">3049</span> @Override<a name="line.3049"></a> -<span class="sourceLineNo">3050</span> public long getNonceGroup(int index) {<a name="line.3050"></a> -<span class="sourceLineNo">3051</span> return this.operations[index].nonceGroup;<a name="line.3051"></a> -<span class="sourceLineNo">3052</span> }<a name="line.3052"></a> -<span class="sourceLineNo">3053</span><a name="line.3053"></a> -<span class="sourceLineNo">3054</span> @Override<a name="line.3054"></a> -<span class="sourceLineNo">3055</span> public long getNonce(int index) {<a name="line.3055"></a> -<span class="sourceLineNo">3056</span> return this.operations[index].nonce;<a name="line.3056"></a> -<span class="sourceLineNo">3057</span> }<a name="line.3057"></a> -<span class="sourceLineNo">3058</span><a name="line.3058"></a> -<span class="sourceLineNo">3059</span> @Override<a name="line.3059"></a> -<span class="sourceLineNo">3060</span> public Mutation[] getMutationsForCoprocs() {<a name="line.3060"></a> -<span class="sourceLineNo">3061</span> assert false;<a name="line.3061"></a> -<span class="sourceLineNo">3062</span> throw new RuntimeException("Should not be called for replay batch");<a name="line.3062"></a> -<span class="sourceLineNo">3063</span> }<a name="line.3063"></a> -<span class="sourceLineNo">3064</span><a name="line.3064"></a> -<span class="sourceLineNo">3065</span> @Override<a name="line.3065"></a> -<span class="sourceLineNo">3066</span> public boolean isInReplay() {<a name="line.3066"></a> -<span class="sourceLineNo">3067</span> return true;<a name="line.3067"></a> -<span class="sourceLineNo">3068</span> }<a name="line.3068"></a> -<span class="sourceLineNo">3069</span><a name="line.3069"></a> -<span class="sourceLineNo">3070</span> @Override<a name="line.3070"></a> -<span class="sourceLineNo">3071</span> public long getReplaySequenceId() {<a name="line.3071"></a> -<span class="sourceLineNo">3072</span> return this.replaySeqId;<a name="line.3072"></a> -<span class="sourceLineNo">3073</span> }<a name="line.3073"></a> -<span class="sourceLineNo">3074</span> }<a name="line.3074"></a> -<span class="sourceLineNo">3075</span><a name="line.3075"></a> -<span class="sourceLineNo">3076</span> @Override<a name="line.3076"></a> -<span class="sourceLineNo">3077</span> public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)<a name="line.3077"></a> -<span class="sourceLineNo">3078</span> throws IOException {<a name="line.3078"></a> -<span class="sourceLineNo">3079</span> // As it stands, this is used for 3 things<a name="line.3079"></a> -<span class="sourceLineNo">3080</span> // * batchMutate with single mutation - put/delete, separate or from checkAndMutate.<a name="line.3080"></a> -<span class="sourceLineNo">3081</span> // * coprocessor calls (see ex. BulkDeleteEndpoint).<a name="line.3081"></a> -<span class="sourceLineNo">3082</span> // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...<a name="line.3082"></a> -<span class="sourceLineNo">3083</span> return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));<a name="line.3083"></a> -<span class="sourceLineNo">3084</span> }<a name="line.3084"></a> -<span class="sourceLineNo">3085</span><a name="line.3085"></a> -<span class="sourceLineNo">3086</span> public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {<a name="line.3086"></a> -<span class="sourceLineNo">3087</span> return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.3087"></a> -<span class="sourceLineNo">3088</span> }<a name="line.3088"></a> -<span class="sourceLineNo">3089</span><a name="line.3089"></a> -<span class="sourceLineNo">3090</span> @Override<a name="line.3090"></a> -<span class="sourceLineNo">3091</span> public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)<a name="line.3091"></a> -<span class="sourceLineNo">3092</span> throws IOException {<a name="line.3092"></a> -<span class="sourceLineNo">3093</span> if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())<a name="line.3093"></a> -<span class="sourceLineNo">3094</span> && replaySeqId < lastReplayedOpenRegionSeqId) {<a name="line.3094"></a> -<span class="sourceLineNo">3095</span> // if it is a secondary replica we should ignore these entries silently<a name="line.3095"></a> -<span class="sourceLineNo">3096</span> // since they are coming out of order<a name="line.3096"></a> -<span class="sourceLineNo">3097</span> if (LOG.isTraceEnabled()) {<a name="line.3097"></a> -<span class="sourceLineNo">3098</span> LOG.trace(getRegionInfo().getEncodedName() + " : "<a name="line.3098"></a> -<span class="sourceLineNo">3099</span> + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId<a name="line.3099"></a> -<span class="sourceLineNo">3100</span> + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);<a name="line.3100"></a> -<span class="sourceLineNo">3101</span> for (MutationReplay mut : mutations) {<a name="line.3101"></a> -<span class="sourceLineNo">3102</span> LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);<a name="line.3102"></a> -<span class="sourceLineNo">3103</span> }<a name="line.3103"></a> -<span class="sourceLineNo">3104</span> }<a name="line.3104"></a> -<span class="sourceLineNo">3105</span><a name="line.3105"></a> -<span class="sourceLineNo">3106</span> OperationStatus[] statuses = new OperationStatus[mutations.length];<a name="line.3106"></a> -<span class="sourceLineNo">3107</span> for (int i = 0; i < statuses.length; i++) {<a name="line.3107"></a> -<span class="sourceLineNo">3108</span> statuses[i] = OperationStatus.SUCCESS;<a name="line.3108"></a> -<span class="sourceLineNo">3109</span> }<a name="line.3109"></a> -<span class="sourceLineNo">3110</span> return statuses;<a name="line.3110"></a> -<span class="sourceLineNo">3111</span> }<a name="line.3111"></a> -<span class="sourceLineNo">3112</span> return batchMutate(new ReplayBatch(mutations, replaySeqId));<a name="line.3112"></a> -<span class="sourceLineNo">3113</span> }<a name="line.3113"></a> -<span class="sourceLineNo">3114</span><a name="line.3114"></a> -<span class="sourceLineNo">3115</span> /**<a name="line.3115"></a> -<span class="sourceLineNo">3116</span> * Perform a batch of mutations.<a name="line.3116"></a> -<span class="sourceLineNo">3117</span> * It supports only Put and Delete mutations and will ignore other types passed.<a name="line.3117"></a> -<span class="sourceLineNo">3118</span> * @param batchOp contains the list of mutations<a name="line.3118"></a> -<span class="sourceLineNo">3119</span> * @return an array of OperationStatus which internally contains the<a name="line.3119"></a> -<span class="sourceLineNo">3120</span> * OperationStatusCode and the exceptionMessage if any.<a name="line.3120"></a> -<span class="sourceLineNo">3121</span> * @throws IOException<a name="line.3121"></a> -<span class="sourceLineNo">3122</span> */<a name="line.3122"></a> -<span class="sourceLineNo">3123</span> OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {<a name="line.3123"></a> -<span class="sourceLineNo">3124</span> boolean initialized = false;<a name="line.3124"></a> -<span class="sourceLineNo">3125</span> Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;<a name="line.3125"></a> -<span class="sourceLineNo">3126</span> startRegionOperation(op);<a name="line.3126"></a> -<span class="sourceLineNo">3127</span> try {<a name="line.3127"></a> -<span class="sourceLineNo">3128</span> while (!batchOp.isDone()) {<a name="line.3128"></a> -<span class="sourceLineNo">3129</span> if (!batchOp.isInReplay()) {<a name="line.3129"></a> -<span class="sourceLineNo">3130</span> checkReadOnly();<a name="line.3130"></a> -<span class="sourceLineNo">3131</span> }<a name="line.3131"></a> -<span class="sourceLineNo">3132</span> checkResources();<a name="line.3132"></a> -<span class="sourceLineNo">3133</span><a name="line.3133"></a> -<span class="sourceLineNo">3134</span> if (!initialized) {<a name="line.3134"></a> -<span class="sourceLineNo">3135</span> this.writeRequestsCount.add(batchOp.operations.length);<a name="line.3135"></a> -<span class="sourceLineNo">3136</span> if (!batchOp.isInReplay()) {<a name="line.3136"></a> -<span class="sourceLineNo">3137</span> doPreBatchMutateHook(batchOp);<a name="line.3137"></a> -<span class="sourceLineNo">3138</span> }<a name="line.3138"></a> -<span class="sourceLineNo">3139</span> initialized = true;<a name="line.3139"></a> -<span class="sourceLineNo">3140</span> }<a name="line.3140"></a> -<span class="sourceLineNo">3141</span> doMiniBatchMutate(batchOp);<a name="line.3141"></a> -<span class="sourceLineNo">3142</span> long newSize = this.getMemstoreSize();<a name="line.3142"></a> -<span class="sourceLineNo">3143</span> requestFlushIfNeeded(newSize);<a name="line.3143"></a> -<span class="sourceLineNo">3144</span> }<a name="line.3144"></a> -<span class="sourceLineNo">3145</span> } finally {<a name="line.3145"></a> -<span class="sourceLineNo">3146</span> closeRegionOperation(op);<a name="line.3146"></a> -<span class="sourceLineNo">3147</span> }<a name="line.3147"></a> -<span class="sourceLineNo">3148</span> return batchOp.retCodeDetails;<a name="line.3148"></a> -<span class="sourceLineNo">3149</span> }<a name="line.3149"></a> -<span class="sourceLineNo">3150</span><a name="line.3150"></a> -<span class="sourceLineNo">3151</span> private void doPreBatchMutateHook(BatchOperation<?> batchOp)<a name="line.3151"></a> -<span class="sourceLineNo">3152</span> throws IOException {<a name="line.3152"></a> -<span class="sourceLineNo">3153</span> /* Run coprocessor pre hook outside of locks to avoid deadlock */<a name="line.3153"></a> -<span class="sourceLineNo">3154</span> WALEdit walEdit = new WALEdit();<a name="line.3154"></a> -<span class="sourceLineNo">3155</span> if (coprocessorHost != null) {<a name="line.3155"></a> -<span class="sourceLineNo">3156</span> for (int i = 0 ; i < batchOp.operations.length; i++) {<a name="line.3156"></a> -<span class="sourceLineNo">3157</span> Mutation m = batchOp.getMutation(i);<a name="line.3157"></a> -<span class="sourceLineNo">3158</span> if (m instanceof Put) {<a name="line.3158"></a> -<span class="sourceLineNo">3159</span> if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {<a name="line.3159"></a> -<span class="sourceLineNo">3160</span> // pre hook says skip this Put<a name="line.3160"></a> -<span class="sourceLineNo">3161</span> // mark as success and skip in doMiniBatchMutation<a name="line.3161"></a> -<span class="sourceLineNo">3162</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3162"></a> -<span class="sourceLineNo">3163</span> }<a name="line.3163"></a> -<span class="sourceLineNo">3164</span> } else if (m instanceof Delete) {<a name="line.3164"></a> -<span class="sourceLineNo">3165</span> Delete curDel = (Delete) m;<a name="line.3165"></a> -<span class="sourceLineNo">3166</span> if (curDel.getFamilyCellMap().isEmpty()) {<a name="line.3166"></a> -<span class="sourceLineNo">3167</span> // handle deleting a row case<a name="line.3167"></a> -<span class="sourceLineNo">3168</span> prepareDelete(curDel);<a name="line.3168"></a> -<span class="sourceLineNo">3169</span> }<a name="line.3169"></a> -<span class="sourceLineNo">3170</span> if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {<a name="line.3170"></a> -<span class="sourceLineNo">3171</span> // pre hook says skip this Delete<a name="line.3171"></a> -<span class="sourceLineNo">3172</span> // mark as success and skip in doMiniBatchMutation<a name="line.3172"></a> -<span class="sourceLineNo">3173</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3173"></a> -<span class="sourceLineNo">3174</span> }<a name="line.3174"></a> -<span class="sourceLineNo">3175</span> } else {<a name="line.3175"></a> -<span class="sourceLineNo">3176</span> // In case of passing Append mutations along with the Puts and Deletes in batchMutate<a name="line.3176"></a> -<span class="sourceLineNo">3177</span> // mark the operation return code as failure so that it will not be considered in<a name="line.3177"></a> -<span class="sourceLineNo">3178</span> // the doMiniBatchMutation<a name="line.3178"></a> -<span class="sourceLineNo">3179</span> batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,<a name="line.3179"></a> -<span class="sourceLineNo">3180</span> "Put/Delete mutations only supported in batchMutate() now");<a name="line.3180"></a> -<span class="sourceLineNo">3181</span> }<a name="line.3181"></a> -<span class="sourceLineNo">3182</span> if (!walEdit.isEmpty()) {<a name="line.3182"></a> -<span class="sourceLineNo">3183</span> batchOp.walEditsFromCoprocessors[i] = walEdit;<a name="line.3183"></a> -<span class="sourceLineNo">3184</span> walEdit = new WALEdit();<a name="line.3184"></a> -<span class="sourceLineNo">3185</span> }<a name="line.3185"></a> -<span class="sourceLineNo">3186</span> }<a name="line.3186"></a> -<span class="sourceLineNo">3187</span> }<a name="line.3187"></a> -<span class="sourceLineNo">3188</span> }<a name="line.3188"></a> -<span class="sourceLineNo">3189</span><a name="line.3189"></a> -<span class="sourceLineNo">3190</span> /**<a name="line.3190"></a> -<span class="sourceLineNo">3191</span> * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}<a name="line.3191"></a> -<span class="sourceLineNo">3192</span> * In here we also handle replay of edits on region recover.<a name="line.3192"></a> -<span class="sourceLineNo">3193</span> * @return Change in size brought about by applying <code>batchOp</code><a name="line.3193"></a> -<span class="sourceLineNo">3194</span> */<a name="line.3194"></a> -<span class="sourceLineNo">3195</span> @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK",<a name="line.3195"></a> -<span class="sourceLineNo">3196</span> justification="Findbugs seems to be confused on this.")<a name="line.3196"></a> -<span class="sourceLineNo">3197</span> @SuppressWarnings("unchecked")<a name="line.3197"></a> -<span class="sourceLineNo">3198</span> // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120<a name="line.3198"></a> -<span class="sourceLineNo">3199</span> private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {<a name="line.3199"></a> -<span class="sourceLineNo">3200</span> boolean replay = batchOp.isInReplay();<a name="line.3200"></a> -<span class="sourceLineNo">3201</span> long currentNonceGroup = HConstants.NO_NONCE;<a name="line.3201"></a> -<span class="sourceLineNo">3202</span> long currentNonce = HConstants.NO_NONCE;<a name="line.3202"></a> -<span class="sourceLineNo">3203</span> WALEdit walEdit = null;<a name="line.3203"></a> -<span class="sourceLineNo">3204</span> boolean locked = false;<a name="line.3204"></a> -<span class="sourceLineNo">3205</span> // reference family maps directly so coprocessors can mutate them if desired<a name="line.3205"></a> -<span class="sourceLineNo">3206</span> Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];<a name="line.3206"></a> -<span class="sourceLineNo">3207</span> // We try to set up a batch in the range [firstIndex,lastIndexExclusive)<a name="line.3207"></a> -<span class="sourceLineNo">3208</span> int firstIndex = batchOp.nextIndexToProcess;<a name="line.3208"></a> -<span class="sourceLineNo">3209</span> int lastIndexExclusive = firstIndex;<a name="line.3209"></a> -<span class="sourceLineNo">3210</span> boolean success = false;<a name="line.3210"></a> -<span class="sourceLineNo">3211</span> int noOfPuts = 0;<a name="line.3211"></a> -<span class="sourceLineNo">3212</span> int noOfDeletes = 0;<a name="line.3212"></a> -<span class="sourceLineNo">3213</span> WriteEntry writeEntry = null;<a name="line.3213"></a> -<span class="sourceLineNo">3214</span> int cellCount = 0;<a name="line.3214"></a> -<span class="sourceLineNo">3215</span> /** Keep track of the locks we hold so we can release them in finally clause */<a name="line.3215"></a> -<span class="sourceLineNo">3216</span> List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);<a name="line.3216"></a> -<span class="sourceLineNo">3217</span> MemstoreSize memstoreSize = new MemstoreSize();<a name="line.3217"></a> -<span class="sourceLineNo">3218</span> final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch();<a name="line.3218"></a> -<span class="sourceLineNo">3219</span> try {<a name="line.3219"></a> -<span class="sourceLineNo">3220</span> // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.<a name="line.3220"></a> -<span class="sourceLineNo">3221</span> int numReadyToWrite = 0;<a name="line.3221"></a> -<span class="sourceLineNo">3222</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.3222"></a> -<span class="sourceLineNo">3223</span> while (lastIndexExclusive < batchOp.operations.length) {<a name="line.3223"></a> -<span class="sourceLineNo">3224</span> if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) {<a name="line.3224"></a> -<span class="sourceLineNo">3225</span> lastIndexExclusive++;<a name="line.3225"></a> -<span class="sourceLineNo">3226</span> continue;<a name="line.3226"></a> -<span class="sourceLineNo">3227</span> }<a name="line.3227"></a> -<span class="sourceLineNo">3228</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.3228"></a> -<span class="sourceLineNo">3229</span> // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.3229"></a> -<span class="sourceLineNo">3230</span> RowLock rowLock = null;<a name="line.3230"></a> -<span class="sourceLineNo">3231</span> try {<a name="line.3231"></a> -<span class="sourceLineNo">3232</span> rowLock = getRowLockInternal(mutation.getRow(), true);<a name="line.3232"></a> -<span class="sourceLineNo">3233</span> } catch (TimeoutIOException e) {<a name="line.3233"></a> -<span class="sourceLineNo">3234</span> // We will retry when other exceptions, but we should stop if we timeout .<a name="line.3234"></a> -<span class="sourceLineNo">3235</span> throw e;<a name="line.3235"></a> -<span class="sourceLineNo">3236</span> } catch (IOException ioe) {<a name="line.3236"></a> -<span class="sourceLineNo">3237</span> LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3237"></a> -<span class="sourceLineNo">3238</span> }<a name="line.3238"></a> -<span class="sourceLineNo">3239</span> if (rowLock == null) {<a name="line.3239"></a> -<span class="sourceLineNo">3240</span> // We failed to grab another lock<a name="line.3240"></a> -<span class="sourceLineNo">3241</span> break; // Stop acquiring more rows for this batch<a name="line.3241"></a> -<span class="sourceLineNo">3242</span> } else {<a name="line.3242"></a> -<span class="sourceLineNo">3243</span> acquiredRowLocks.add(rowLock);<a name="line.3243"></a> -<span class="sourceLineNo">3244</span> }<a name="line.3244"></a> -<span class="sourceLineNo">3245</span><a name="line.3245"></a> -<span class="sourceLineNo">3246</span> lastIndexExclusive++;<a name="line.3246"></a> -<span class="sourceLineNo">3247</span> numReadyToWrite++;<a name="line.3247"></a> -<span class="sourceLineNo">3248</span> if (replay) {<a name="line.3248"></a> -<span class="sourceLineNo">3249</span> for (List<Cell> cells : mutation.getFamilyCellMap().values()) {<a name="line.3249"></a> -<span class="sourceLineNo">3250</span> cellCount += cells.size();<a name="line.3250"></a> -<span class="sourceLineNo">3251</span> }<a name="line.3251"></a> -<span class="sourceLineNo">3252</span> }<a name="line.3252"></a> -<span class="sourceLineNo">3253</span> }<a name="line.3253"></a> -<span class="sourceLineNo">3254</span><a name="line.3254"></a> -<span class="sourceLineNo">3255</span> // We've now grabbed as many mutations off the list as we can<a name="line.3255"></a> -<span class="sourceLineNo">3256</span><a name="line.3256"></a> -<span class="sourceLineNo">3257</span> // STEP 2. Update any LATEST_TIMESTAMP timestamps<a name="line.3257"></a> -<span class="sourceLineNo">3258</span> // We should record the timestamp only after we have acquired the rowLock,<a name="line.3258"></a> -<span class="sourceLineNo">3259</span> // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp<a name="line.3259"></a> -<span class="sourceLineNo">3260</span> now = EnvironmentEdgeManager.currentTime();<a name="line.3260"></a> -<span class="sourceLineNo">3261</span> byte[] byteNow = Bytes.toBytes(now);<a name="line.3261"></a> -<span class="sourceLineNo">3262</span><a name="line.3262"></a> -<span class="sourceLineNo">3263</span> // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?<a name="line.3263"></a> -<span class="sourceLineNo">3264</span> if (numReadyToWrite <= 0) {<a name="line.3264"></a> -<span class="sourceLineNo">3265</span> return;<a name="line.3265"></a> -<span class="sourceLineNo">3266</span> }<a name="line.3266"></a> -<span class="sourceLineNo">3267</span><a name="line.3267"></a> -<span class="sourceLineNo">3268</span> for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {<a name="line.3268"></a> -<span class="sourceLineNo">3269</span> // skip invalid<a name="line.3269"></a> -<span class="sourceLineNo">3270</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3270"></a> -<span class="sourceLineNo">3271</span> != OperationStatusCode.NOT_RUN) {<a name="line.3271"></a> -<span class="sourceLineNo">3272</span> // lastIndexExclusive was incremented above.<a name="line.3272"></a> -<span class="sourceLineNo">3273</span> continue;<a name="line.3273"></a> -<span class="sourceLineNo">3274</span> }<a name="line.3274"></a> -<span class="sourceLineNo">3275</span><a name="line.3275"></a> -<span class="sourceLineNo">3276</span> Mutation mutation = batchOp.getMutation(i);<a name="line.3276"></a> -<span class="sourceLineNo">3277</span> if (mutation instanceof Put) {<a name="line.3277"></a> -<span class="sourceLineNo">3278</span> updateCellTimestamps(familyMaps[i].values(), byteNow);<a name="line.3278"></a> -<span class="sourceLineNo">3279</span> noOfPuts++;<a name="line.3279"></a> -<span class="sourceLineNo">3280</span> } else {<a name="line.3280"></a> -<span class="sourceLineNo">3281</span> prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);<a name="line.3281"></a> -<span class="sourceLineNo">3282</span> noOfDeletes++;<a name="line.3282"></a> -<span class="sourceLineNo">3283</span> }<a name="line.3283"></a> -<span class="sourceLineNo">3284</span> rewriteCellTags(familyMaps[i], mutation);<a name="line.3284"></a> -<span class="sourceLineNo">3285</span> WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];<a name="line.3285"></a> -<span class="sourceLineNo">3286</span> if (fromCP != null) {<a name="line.3286"></a> -<span class="sourceLineNo">3287</span> cellCount += fromCP.size();<a name="line.3287"></a> -<span class="sourceLineNo">3288</span> }<a name="line.3288"></a> -<span class="sourceLineNo">3289</span> if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {<a name="line.3289"></a> -<span class="sourceLineNo">3290</span> for (List<Cell> cells : familyMaps[i].values()) {<a name="line.3290"></a> -<span class="sourceLineNo">3291</span> cellCount += cells.size();<a name="line.3291"></a> -<span class="sourceLineNo">3292</span> }<a name="line.3292"></a> -<span class="sourceLineNo">3293</span> }<a name="line.3293"></a> -<span class="sourceLineNo">3294</span> }<a name="line.3294"></a> -<span class="sourceLineNo">3295</span> lock(this.updatesLock.readLock(), numReadyToWrite);<a name="line.3295"></a> -<span class="sourceLineNo">3296</span> locked = true;<a name="line.3296"></a> -<span class="sourceLineNo">3297</span><a name="line.3297"></a> -<span class="sourceLineNo">3298</span> // calling the pre CP hook for batch mutation<a name="line.3298"></a> -<span class="sourceLineNo">3299</span> if (!replay && coprocessorHost != null) {<a name="line.3299"></a> -<span class="sourceLineNo">3300</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3300"></a> -<span class="sourceLineNo">3301</span> new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),<a name="line.3301"></a> -<span class="sourceLineNo">3302</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3302"></a> -<span class="sourceLineNo">3303</span> if (coprocessorHost.preBatchMutate(miniBatchOp)) {<a name="line.3303"></a> -<span class="sourceLineNo">3304</span> return;<a name="line.3304"></a> -<span class="sourceLineNo">3305</span> } else {<a name="line.3305"></a> -<span class="sourceLineNo">3306</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3306"></a> -<span class="sourceLineNo">3307</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {<a name="line.3307"></a> -<span class="sourceLineNo">3308</span> // lastIndexExclusive was incremented above.<a name="line.3308"></a> -<span class="sourceLineNo">3309</span> continue;<a name="line.3309"></a> -<span class="sourceLineNo">3310</span> }<a name="line.3310"></a> -<span class="sourceLineNo">3311</span> // we pass (i - firstIndex) below since the call expects a relative index<a name="line.3311"></a> -<span class="sourceLineNo">3312</span> Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);<a name="line.3312"></a> -<span class="sourceLineNo">3313</span> if (cpMutations == null) {<a name="line.3313"></a> -<span class="sourceLineNo">3314</span> continue;<a name="line.3314"></a> -<span class="sourceLineNo">3315</span> }<a name="line.3315"></a> -<span class="sourceLineNo">3316</span> Mutation mutation = batchOp.getMutation(i);<a name="line.3316"></a> -<span class="sourceLineNo">3317</span> boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL;<a name="line.3317"></a> -<span class="sourceLineNo">3318</span> // Else Coprocessor added more Mutations corresponding to the Mutation at this index.<a name="line.3318"></a> -<span class="sourceLineNo">3319</span> for (int j = 0; j < cpMutations.length; j++) {<a name="line.3319"></a> -<span class="sourceLineNo">3320</span> Mutation cpMutation = cpMutations[j];<a name="line.3320"></a> -<span class="sourceLineNo">3321</span> Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();<a name="line.3321"></a> -<span class="sourceLineNo">3322</span> checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now);<a name="line.3322"></a> -<span class="sourceLineNo">3323</span><a name="line.3323"></a> -<span class="sourceLineNo">3324</span> // Acquire row locks. If not, the whole batch will fail.<a name="line.3324"></a> -<span class="sourceLineNo">3325</span> acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));<a name="line.3325"></a> +<span class="sourceLineNo">2861</span> // All edits for the given row (across all column families) must happen atomically.<a name="line.2861"></a> +<span class="sourceLineNo">2862</span> doBatchMutate(delete);<a name="line.2862"></a> +<span class="sourceLineNo">2863</span> } finally {<a name="line.2863"></a> +<span class="sourceLineNo">2864</span> closeRegionOperation(Operation.DELETE);<a name="line.2864"></a> +<span class="sourceLineNo">2865</span> }<a name="line.2865"></a> +<span class="sourceLineNo">2866</span> }<a name="line.2866"></a> +<span class="sourceLineNo">2867</span><a name="line.2867"></a> +<span class="sourceLineNo">2868</span> /**<a name="line.2868"></a> +<span class="sourceLineNo">2869</span> * Row needed by below method.<a name="line.2869"></a> +<span class="sourceLineNo">2870</span> */<a name="line.2870"></a> +<span class="sourceLineNo">2871</span> private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");<a name="line.2871"></a> +<span class="sourceLineNo">2872</span><a name="line.2872"></a> +<span class="sourceLineNo">2873</span> /**<a name="line.2873"></a> +<span class="sourceLineNo">2874</span> * This is used only by unit tests. Not required to be a public API.<a name="line.2874"></a> +<span class="sourceLineNo">2875</span> * @param familyMap map of family to edits for the given family.<a name="line.2875"></a> +<span class="sourceLineNo">2876</span> * @throws IOException<a name="line.2876"></a> +<span class="sourceLineNo">2877</span> */<a name="line.2877"></a> +<span class="sourceLineNo">2878</span> void delete(NavigableMap<byte[], List<Cell>> familyMap,<a name="line.2878"></a> +<span class="sourceLineNo">2879</span> Durability durability) throws IOException {<a name="line.2879"></a> +<span class="sourceLineNo">2880</span> Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);<a name="line.2880"></a> +<span class="sourceLineNo">2881</span> delete.setFamilyCellMap(familyMap);<a name="line.2881"></a> +<span class="sourceLineNo">2882</span> delete.setDurability(durability);<a name="line.2882"></a> +<span class="sourceLineNo">2883</span> doBatchMutate(delete);<a name="line.2883"></a> +<span class="sourceLineNo">2884</span> }<a name="line.2884"></a> +<span class="sourceLineNo">2885</span><a name="line.2885"></a> +<span class="sourceLineNo">2886</span> @Override<a name="line.2886"></a> +<span class="sourceLineNo">2887</span> public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,<a name="line.2887"></a> +<span class="sourceLineNo">2888</span> byte[] byteNow) throws IOException {<a name="line.2888"></a> +<span class="sourceLineNo">2889</span> for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {<a name="line.2889"></a> +<span class="sourceLineNo">2890</span><a name="line.2890"></a> +<span class="sourceLineNo">2891</span> byte[] family = e.getKey();<a name="line.2891"></a> +<span class="sourceLineNo">2892</span> List<Cell> cells = e.getValue();<a name="line.2892"></a> +<span class="sourceLineNo">2893</span> assert cells instanceof RandomAccess;<a name="line.2893"></a> +<span class="sourceLineNo">2894</span><a name="line.2894"></a> +<span class="sourceLineNo">2895</span> Map<byte[], Integer> kvCount = new TreeMap<>(Bytes.BYTES_COMPARATOR);<a name="line.2895"></a> +<span class="sourceLineNo">2896</span> int listSize = cells.size();<a name="line.2896"></a> +<span class="sourceLineNo">2897</span> for (int i=0; i < listSize; i++) {<a name="line.2897"></a> +<span class="sourceLineNo">2898</span> Cell cell = cells.get(i);<a name="line.2898"></a> +<span class="sourceLineNo">2899</span> // Check if time is LATEST, change to time of most recent addition if so<a name="line.2899"></a> +<span class="sourceLineNo">2900</span> // This is expensive.<a name="line.2900"></a> +<span class="sourceLineNo">2901</span> if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {<a name="line.2901"></a> +<span class="sourceLineNo">2902</span> byte[] qual = CellUtil.cloneQualifier(cell);<a name="line.2902"></a> +<span class="sourceLineNo">2903</span> if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;<a name="line.2903"></a> +<span class="sourceLineNo">2904</span><a name="line.2904"></a> +<span class="sourceLineNo">2905</span> Integer count = kvCount.get(qual);<a name="line.2905"></a> +<span class="sourceLineNo">2906</span> if (count == null) {<a name="line.2906"></a> +<span class="sourceLineNo">2907</span> kvCount.put(qual, 1);<a name="line.2907"></a> +<span class="sourceLineNo">2908</span> } else {<a name="line.2908"></a> +<span class="sourceLineNo">2909</span> kvCount.put(qual, count + 1);<a name="line.2909"></a> +<span class="sourceLineNo">2910</span> }<a name="line.2910"></a> +<span class="sourceLineNo">2911</span> count = kvCount.get(qual);<a name="line.2911"></a> +<span class="sourceLineNo">2912</span><a name="line.2912"></a> +<span class="sourceLineNo">2913</span> Get get = new Get(CellUtil.cloneRow(cell));<a name="line.2913"></a> +<span class="sourceLineNo">2914</span> get.setMaxVersions(count);<a name="line.2914"></a> +<span class="sourceLineNo">2915</span> get.addColumn(family, qual);<a name="line.2915"></a> +<span class="sourceLineNo">2916</span> if (coprocessorHost != null) {<a name="line.2916"></a> +<span class="sourceLineNo">2917</span> if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,<a name="line.2917"></a> +<span class="sourceLineNo">2918</span> byteNow, get)) {<a name="line.2918"></a> +<span class="sourceLineNo">2919</span> updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);<a name="line.2919"></a> +<span class="sourceLineNo">2920</span> }<a name="line.2920"></a> +<span class="sourceLineNo">2921</span> } else {<a name="line.2921"></a> +<span class="sourceLineNo">2922</span> updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);<a name="line.2922"></a> +<span class="sourceLineNo">2923</span> }<a name="line.2923"></a> +<span class="sourceLineNo">2924</span> } else {<a name="line.2924"></a> +<span class="sourceLineNo">2925</span> CellUtil.updateLatestStamp(cell, byteNow, 0);<a name="line.2925"></a> +<span class="sourceLineNo">2926</span> }<a name="line.2926"></a> +<span class="sourceLineNo">2927</span> }<a name="line.2927"></a> +<span class="sourceLineNo">2928</span> }<a name="line.2928"></a> +<span class="sourceLineNo">2929</span> }<a name="line.2929"></a> +<span class="sourceLineNo">2930</span><a name="line.2930"></a> +<span class="sourceLineNo">2931</span> void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)<a name="line.2931"></a> +<span class="sourceLineNo">2932</span> throws IOException {<a name="line.2932"></a> +<span class="sourceLineNo">2933</span> List<Cell> result = get(get, false);<a name="line.2933"></a> +<span class="sourceLineNo">2934</span><a name="line.2934"></a> +<span class="sourceLineNo">2935</span> if (result.size() < count) {<a name="line.2935"></a> +<span class="sourceLineNo">2936</span> // Nothing to delete<a name="line.2936"></a> +<span class="sourceLineNo">2937</span> CellUtil.updateLatestStamp(cell, byteNow, 0);<a name="line.2937"></a> +<span class="sourceLineNo">2938</span> return;<a name="line.2938"></a> +<span class="sourceLineNo">2939</span> }<a name="line.2939"></a> +<span class="sourceLineNo">2940</span> if (result.size() > count) {<a name="line.2940"></a> +<span class="sourceLineNo">2941</span> throw new RuntimeException("Unexpected size: " + result.size());<a name="line.2941"></a> +<span class="sourceLineNo">2942</span> }<a name="line.2942"></a> +<span class="sourceLineNo">2943</span> Cell getCell = result.get(count - 1);<a name="line.2943"></a> +<span class="sourceLineNo">2944</span> CellUtil.setTimestamp(cell, getCell.getTimestamp());<a name="line.2944"></a> +<span class="sourceLineNo">2945</span> }<a name="line.2945"></a> +<span class="sourceLineNo">2946</span><a name="line.2946"></a> +<span class="sourceLineNo">2947</span> @Override<a name="line.2947"></a> +<span class="sourceLineNo">2948</span> public void put(Put put) throws IOException {<a name="line.2948"></a> +<span class="sourceLineNo">2949</span> checkReadOnly();<a name="line.2949"></a> +<span class="sourceLineNo">2950</span><a name="line.2950"></a> +<span class="sourceLineNo">2951</span> // Do a rough check that we have resources to accept a write. The check is<a name="line.2951"></a> +<span class="sourceLineNo">2952</span> // 'rough' in that between the resource check and the call to obtain a<a name="line.2952"></a> +<span class="sourceLineNo">2953</span> // read lock, resources may run out. For now, the thought is that this<a name="line.2953"></a> +<span class="sourceLineNo">2954</span> // will be extremely rare; we'll deal with it when it happens.<a name="line.2954"></a> +<span class="sourceLineNo">2955</span> checkResources();<a name="line.2955"></a> +<span class="sourceLineNo">2956</span> startRegionOperation(Operation.PUT);<a name="line.2956"></a> +<span class="sourceLineNo">2957</span> try {<a name="line.2957"></a> +<span class="sourceLineNo">2958</span> // All edits for the given row (across all column families) must happen atomically.<a name="line.2958"></a> +<span class="sourceLineNo">2959</span> doBatchMutate(put);<a name="line.2959"></a> +<span class="sourceLineNo">2960</span> } finally {<a name="line.2960"></a> +<span class="sourceLineNo">2961</span> closeRegionOperation(Operation.PUT);<a name="line.2961"></a> +<span class="sourceLineNo">2962</span> }<a name="line.2962"></a> +<span class="sourceLineNo">2963</span> }<a name="line.2963"></a> +<span class="sourceLineNo">2964</span><a name="line.2964"></a> +<span class="sourceLineNo">2965</span> /**<a name="line.2965"></a> +<span class="sourceLineNo">2966</span> * Struct-like class that tracks the progress of a batch operation,<a name="line.2966"></a> +<span class="sourceLineNo">2967</span> * accumulating status codes and tracking the index at which processing<a name="line.2967"></a> +<span class="sourceLineNo">2968</span> * is proceeding.<a name="line.2968"></a> +<span class="sourceLineNo">2969</span> */<a name="line.2969"></a> +<span class="sourceLineNo">2970</span> private abstract static class BatchOperation<T> {<a name="line.2970"></a> +<span class="sourceLineNo">2971</span> T[] operations;<a name="line.2971"></a> +<span class="sourceLineNo">2972</span> int nextIndexToProcess = 0;<a name="line.2972"></a> +<span class="sourceLineNo">2973</span> OperationStatus[] retCodeDetails;<a name="line.2973"></a> +<span class="sourceLineNo">2974</span> WALEdit[] walEditsFromCoprocessors;<a name="line.2974"></a> +<span class="sourceLineNo">2975</span><a name="line.2975"></a> +<span class="sourceLineNo">2976</span> public BatchOperation(T[] operations) {<a name="line.2976"></a> +<span class="sourceLineNo">2977</span> this.operations = operations;<a name="line.2977"></a> +<span class="sourceLineNo">2978</span> this.retCodeDetails = new OperationStatus[operations.length];<a name="line.2978"></a> +<span class="sourceLineNo">2979</span> this.walEditsFromCoprocessors = new WALEdit[operations.length];<a name="line.2979"></a> +<span class="sourceLineNo">2980</span> Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);<a name="line.2980"></a> +<span class="sourceLineNo">2981</span> }<a name="line.2981"></a> +<span class="sourceLineNo">2982</span><a name="line.2982"></a> +<span class="sourceLineNo">2983</span> public abstract Mutation getMutation(int index);<a name="line.2983"></a> +<span class="sourceLineNo">2984</span> public abstract long getNonceGroup(int index);<a name="line.2984"></a> +<span class="sourceLineNo">2985</span> public abstract long getNonce(int index);<a name="line.2985"></a> +<span class="sourceLineNo">2986</span> /** This method is potentially expensive and should only be used for non-replay CP path. */<a name="line.2986"></a> +<span class="sourceLineNo">2987</span> public abstract Mutation[] getMutationsForCoprocs();<a name="line.2987"></a> +<span class="sourceLineNo">2988</span> public abstract boolean isInReplay();<a name="line.2988"></a> +<span class="sourceLineNo">2989</span> public abstract long getReplaySequenceId();<a name="line.2989"></a> +<span class="sourceLineNo">2990</span><a name="line.2990"></a> +<span class="sourceLineNo">2991</span> public boolean isDone() {<a name="line.2991"></a> +<span class="sourceLineNo">2992</span> return nextIndexToProcess == operations.length;<a name="line.2992"></a> +<span class="sourceLineNo">2993</span> }<a name="line.2993"></a> +<span class="sourceLineNo">2994</span> }<a name="line.2994"></a> +<span class="sourceLineNo">2995</span><a name="line.2995"></a> +<span class="sourceLineNo">2996</span> private static class MutationBatch extends BatchOperation<Mutation> {<a name="line.2996"></a> +<span class="sourceLineNo">2997</span> private long nonceGroup;<a name="line.2997"></a> +<span class="sourceLineNo">2998</span> private long nonce;<a name="line.2998"></a> +<span class="sourceLineNo">2999</span> public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {<a name="line.2999"></a> +<span class="sourceLineNo">3000</span> super(operations);<a name="line.3000"></a> +<span class="sourceLineNo">3001</span> this.nonceGroup = nonceGroup;<a name="line.3001"></a> +<span class="sourceLineNo">3002</span> this.nonce = nonce;<a name="line.3002"></a> +<span class="sourceLineNo">3003</span> }<a name="line.3003"></a> +<span class="sourceLineNo">3004</span><a name="line.3004"></a> +<span class="sourceLineNo">3005</span> @Override<a name="line.3005"></a> +<span class="sourceLineNo">3006</span> public Mutation getMutation(int index) {<a name="line.3006"></a> +<span class="sourceLineNo">3007</span> return this.operations[index];<a name="line.3007"></a> +<span class="sourceLineNo">3008</span> }<a name="line.3008"></a> +<span class="sourceLineNo">3009</span><a name="line.3009"></a> +<span class="sourceLineNo">3010</span> @Override<a name="line.3010"></a> +<span class="sourceLineNo">3011</span> public long getNonceGroup(int index) {<a name="line.3011"></a> +<span class="sourceLineNo">3012</span> return nonceGroup;<a name="line.3012"></a> +<span class="sourceLineNo">3013</span> }<a name="line.3013"></a> +<span class="sourceLineNo">3014</span><a name="line.3014"></a> +<span class="sourceLineNo">3015</span> @Override<a name="line.3015"></a> +<span class="sourceLineNo">3016</span> public long getNonce(int index) {<a name="line.3016"></a> +<span class="sourceLineNo">3017</span> return nonce;<a name="line.3017"></a> +<span class="sourceLineNo">3018</span> }<a name="line.3018"></a> +<span class="sourceLineNo">3019</span><a name="line.3019"></a> +<span class="sourceLineNo">3020</span> @Override<a name="line.3020"></a> +<span class="sourceLineNo">3021</span> public Mutation[] getMutationsForCoprocs() {<a name="line.3021"></a> +<span class="sourceLineNo">3022</span> return this.operations;<a name="line.3022"></a> +<span class="sourceLineNo">3023</span> }<a name="line.3023"></a> +<span class="sourceLineNo">3024</span><a name="line.3024"></a> +<span class="sourceLineNo">3025</span> @Override<a name="line.3025"></a> +<span class="sourceLineNo">3026</span> public boolean isInReplay() {<a name="line.3026"></a> +<span class="sourceLineNo">3027</span> return false;<a name="line.3027"></a> +<span class="sourceLineNo">3028</span> }<a name="line.3028"></a> +<span class="sourceLineNo">3029</span><a name="line.3029"></a> +<span class="sourceLineNo">3030</span> @Override<a name="line.3030"></a> +<span class="sourceLineNo">3031</span> public long getReplaySequenceId() {<a name="line.3031"></a> +<span class="sourceLineNo">3032</span> return 0;<a name="line.3032"></a> +<span class="sourceLineNo">3033</span> }<a name="line.3033"></a> +<span class="sourceLineNo">3034</span> }<a name="line.3034"></a> +<span class="sourceLineNo">3035</span><a name="line.3035"></a> +<span class="sourceLineNo">3036</span> private static class ReplayBatch extends BatchOperation<MutationReplay> {<a name="line.3036"></a> +<span class="sourceLineNo">3037</span> private long replaySeqId = 0;<a name="line.3037"></a> +<span class="sourceLineNo">3038</span> public ReplayBatch(MutationReplay[] operations, long seqId) {<a name="line.3038"></a> +<span class="sourceLineNo">3039</span> super(operations);<a name="line.3039"></a> +<span class="sourceLineNo">3040</span> this.replaySeqId = seqId;<a name="line.3040"></a> +<span class="sourceLineNo">3041</span> }<a name="line.3041"></a> +<span class="sourceLineNo">3042</span><a name="line.3042"></a> +<span class="sourceLineNo">3043</span> @Override<a name="line.3043"></a> +<span class="sourceLineNo">3044</span> public Mutation getMutation(int index) {<a name="line.3044"></a> +<span class="sourceLineNo">3045</span> return this.operations[index].mutation;<a name="line.3045"></a> +<span class="sourceLineNo">3046</span> }<a name="line.3046"></a> +<span class="sourceLineNo">3047</span><a name="line.3047"></a> +<span class="sourceLineNo">3048</span> @Override<a name="line.3048"></a> +<span class="sourceLineNo">3049</span> public long getNonceGroup(int index) {<a name="line.3049"></a> +<span class="sourceLineNo">3050</span> return this.operations[index].nonceGroup;<a name="line.3050"></a> +<span class="sourceLineNo">3051</span> }<a name="line.3051"></a> +<span class="sourceLineNo">3052</span><a name="line.3052"></a> +<span class="sourceLineNo">3053</span> @Override<a name="line.3053"></a> +<span class="sourceLineNo">3054</span> public long getNonce(int index) {<a name="line.3054"></a> +<span class="sourceLineNo">3055</span> return this.operations[index].nonce;<a name="line.3055"></a> +<span class="sourceLineNo">3056</span> }<a name="line.3056"></a> +<span class="sourceLineNo">3057</span><a name="line.3057"></a> +<span class="sourceLineNo">3058</span> @Override<a name="line.3058"></a> +<span class="sourceLineNo">3059</span> public Mutation[] getMutationsForCoprocs() {<a name="line.3059"></a> +<span class="sourceLineNo">3060</span> assert false;<a name="line.3060"></a> +<span class="sourceLineNo">3061</span> throw new RuntimeException("Should not be called for replay batch");<a name="line.3061"></a> +<span class="sourceLineNo">3062</span> }<a name="line.3062"></a> +<span class="sourceLineNo">3063</span><a name="line.3063"></a> +<span class="sourceLineNo">3064</span> @Override<a name="line.3064"></a> +<span class="sourceLineNo">3065</span> public boolean isInReplay() {<a name="line.3065"></a> +<span class="sourceLineNo">3066</span> return true;<a name="line.3066"></a> +<span class="sourceLineNo">3067</span> }<a name="line.3067"></a> +<span class="sourceLineNo">3068</span><a name="line.3068"></a> +<span class="sourceLineNo">3069</span> @Override<a name="line.3069"></a> +<span class="sourceLineNo">3070</span> public long getReplaySequenceId() {<a name="line.3070"></a> +<span class="sourceLineNo">3071</span> return this.replaySeqId;<a name="line.3071"></a> +<span class="sourceLineNo">3072</span> }<a name="line.3072"></a> +<span class="sourceLineNo">3073</span> }<a name="line.3073"></a> +<span class="sourceLineNo">3074</span><a name="line.3074"></a> +<span class="sourceLineNo">3075</span> @Override<a name="line.3075"></a> +<span class="sourceLineNo">3076</span> public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)<a name="line.3076"></a> +<span class="sourceLineNo">3077</span> throws IOException {<a name="line.3077"></a> +<span class="sourceLineNo">3078</span> // As it stands, this is used for 3 things<a name="line.3078"></a> +<span class="sourceLineNo">3079</span> // * batchMutate with single mutation - put/delete, separate or from checkAndMutate.<a name="line.3079"></a> +<span class="sourceLineNo">3080</span> // * coprocessor calls (see ex. BulkDeleteEndpoint).<a name="line.3080"></a> +<span class="sourceLineNo">3081</span> // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...<a name="line.3081"></a> +<span class="sourceLineNo">3082</span> return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));<a name="line.3082"></a> +<span class="sourceLineNo">3083</span> }<a name="line.3083"></a> +<span class="sourceLineNo">3084</span><a name="line.3084"></a> +<span class="sourceLineNo">3085</span> public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {<a name="line.3085"></a> +<span class="sourceLineNo">3086</span> return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.3086"></a> +<span class="sourceLineNo">3087</span> }<a name="line.3087"></a> +<span class="sourceLineNo">3088</span><a name="line.3088"></a> +<span class="sourceLineNo">3089</span> @Override<a name="line.3089"></a> +<span class="sourceLineNo">3090</span> public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)<a name="line.3090"></a> +<span class="sourceLineNo">3091</span> throws IOException {<a name="line.3091"></a> +<span class="sourceLineNo">3092</span> if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())<a name="line.3092"></a> +<span class="sourceLineNo">3093</span> && replaySeqId < lastReplayedOpenRegionSeqId) {<a name="line.3093"></a> +<span class="sourceLineNo">3094</span> // if it is a secondary replica we should ignore these entries silently<a name="line.3094"></a> +<span class="sourceLineNo">3095</span> // since they are coming out of order<a name="line.3095"></a> +<span class="sourceLineNo">3096</span> if (LOG.isTraceEnabled()) {<a name="line.3096"></a> +<span class="sourceLineNo">3097</span> LOG.trace(getRegionInfo().getEncodedName() + " : "<a name="line.3097"></a> +<span class="sourceLineNo">3098</span> + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId<a name="line.3098"></a> +<span class="sourceLineNo">3099</span> + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);<a name="line.3099"></a> +<span class="sourceLineNo">3100</span> for (MutationReplay mut : mutations) {<a name="line.3100"></a> +<span class="sourceLineNo">3101</span> LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);<a name="line.3101"></a> +<span class="sourceLineNo">3102</span> }<a name="line.3102"></a> +<span class="sourceLineNo">3103</span> }<a name="line.3103"></a> +<span class="sourceLineNo">3104</span><a name="line.3104"></a> +<span class="sourceLineNo">3105</span> OperationStatus[] statuses = new OperationStatus[mutations.length];<a name="line.3105"></a> +<span class="sourceLineNo">3106</span> for (int i = 0; i < statuses.length; i++) {<a name="line.3106"></a> +<span class="sourceLineNo">3107</span> statuses[i] = OperationStatus.SUCCESS;<a name="line.3107"></a> +<span class="sourceLineNo">3108</span> }<a name="line.3108"></a> +<span class="sourceLineNo">3109</span> return statuses;<a name="line.3109"></a> +<span class="sourceLineNo">3110</span> }<a name="line.3110"></a> +<span class="sourceLineNo">3111</span> return batchMutate(new ReplayBatch(mutations, replaySeqId));<a name="line.3111"></a> +<span class="sourceLineNo">3112</span> }<a name="line.3112"></a> +<span class="sourceLineNo">3113</span><a name="line.3113"></a> +<span class="sourceLineNo">3114</span> /**<a name="line.3114"></a> +<span class="sourceLineNo">3115</span> * Perform a batch of mutations.<a name="line.3115"></a> +<span class="sourceLineNo">3116</span> * It supports only Put and Delete mutations and will ignore other types passed.<a name="line.3116"></a> +<span class="sourceLineNo">3117</span> * @param batchOp contains the list of mutations<a name="line.3117"></a> +<span class="sourceLineNo">3118</span> * @return an array of OperationStatus which internally contains the<a name="line.3118"></a> +<span class="sourceLineNo">3119</span> * OperationStatusCode and the exceptionMessage if any.<a name="line.3119"></a> +<span class="sourceLineNo">3120</span> * @throws IOException<a name="line.3120"></a> +<span class="sourceLineNo">3121</span> */<a name="line.3121"></a> +<span class="sourceLineNo">3122</span> OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {<a name="line.3122"></a> +<span class="sourceLineNo">3123</span> boolean initialized = false;<a name="line.3123"></a> +<span class="sourceLineNo">3124</span> Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;<a name="line.3124"></a> +<span class="sourceLineNo">3125</span> startRegionOperation(op);<a name="line.3125"></a> +<span class="sourceLineNo">3126</span> try {<a name="line.3126"></a> +<span class="sourceLineNo">3127</span> while (!batchOp.isDone()) {<a name="line.3127"></a> +<span class="sourceLineNo">3128</span> if (!batchOp.isInReplay()) {<a name="line.3128"></a> +<span class="sourceLineNo">3129</span> checkReadOnly();<a name="line.3129"></a> +<span class="sourceLineNo">3130</span> }<a name="line.3130"></a> +<span class="sourceLineNo">3131</span> checkResources();<a name="line.3131"></a> +<span class="sourceLineNo">3132</span><a name="line.3132"></a> +<span class="sourceLineNo">3133</span> if (!initialized) {<a name="line.3133"></a> +<span class="sourceLineNo">3134</span> this.writeRequestsCount.add(batchOp.operations.length);<a name="line.3134"></a> +<span class="sourceLineNo">3135</span> if (!batchOp.isInReplay()) {<a name="line.3135"></a> +<span class="sourceLineNo">3136</span> doPreBatchMutateHook(batchOp);<a name="line.3136"></a> +<span class="sourceLineNo">3137</span> }<a name="line.3137"></a> +<span class="sourceLineNo">3138</span> initialized = true;<a name="line.3138"></a> +<span class="sourceLineNo">3139</span> }<a name="line.3139"></a> +<span class="sourceLineNo">3140</span> doMiniBatchMutate(batchOp);<a name="line.3140"></a> +<span class="sourceLineNo">3141</span> long newSize = this.getMemstoreSize();<a name="line.3141"></a> +<span class="sourceLineNo">3142</span> requestFlushIfNeeded(newSize);<a name="line.3142"></a> +<span class="sourceLineNo">3143</span> }<a name="line.3143"></a> +<span class="sourceLineNo">3144</span> } finally {<a name="line.3144"></a> +<span class="sourceLineNo">3145</span> closeRegionOperation(op);<a name="line.3145"></a> +<span class="sourceLineNo">3146</span> }<a name="line.3146"></a> +<span class="sourceLineNo">3147</span> return batchOp.retCodeDetails;<a name="line.3147"></a> +<span class="sourceLineNo">3148</span> }<a name="line.3148"></a> +<span class="sourceLineNo">3149</span><a name="line.3149"></a> +<span class="sourceLineNo">3150</span> private void doPreBatchMutateHook(BatchOperation<?> batchOp)<a name="line.3150"></a> +<span class="sourceLineNo">3151</span> throws IOException {<a name="line.3151"></a> +<span class="sourceLineNo">3152</span> /* Run coprocessor pre hook outside of locks to avoid deadlock */<a name="line.3152"></a> +<span class="sourceLineNo">3153</span> WALEdit walEdit = new WALEdit();<a name="line.3153"></a> +<span class="sourceLineNo">3154</span> if (coprocessorHost != null) {<a name="line.3154"></a> +<span class="sourceLineNo">3155</span> for (int i = 0 ; i < batchOp.operations.length; i++) {<a name="line.3155"></a> +<span class="sourceLineNo">3156</span> Mutation m = batchOp.getMutation(i);<a name="line.3156"></a> +<span class="sourceLineNo">3157</span> if (m instanceof Put) {<a name="line.3157"></a> +<span class="sourceLineNo">3158</span> if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {<a name="line.3158"></a> +<span class="sourceLineNo">3159</span> // pre hook says skip this Put<a name="line.3159"></a> +<span class="sourceLineNo">3160</span> // mark as success and skip in doMiniBatchMutation<a name="line.3160"></a> +<span class="sourceLineNo">3161</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3161"></a> +<span class="sourceLineNo">3162</span> }<a name="line.3162"></a> +<span class="sourceLineNo">3163</span> } else if (m instanceof Delete) {<a name="line.3163"></a> +<span class="sourceLineNo">3164</span> Delete curDel = (Delete) m;<a name="line.3164"></a> +<span class="sourceLineNo">3165</span> if (curDel.getFamilyCellMap().isEmpty()) {<a name="line.3165"></a> +<span class="sourceLineNo">3166</span> // handle deleting a row case<a name="line.3166"></a> +<span class="sourceLineNo">3167</span> prepareDelete(curDel);<a name="line.3167"></a> +<span class="sourceLineNo">3168</span> }<a name="line.3168"></a> +<span class="sourceLineNo">3169</span> if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {<a name="line.3169"></a> +<span class="sourceLineNo">3170</span> // pre hook says skip this Delete<a name="line.3170"></a> +<span class="sourceLineNo">3171</span> // mark as success and skip in doMiniBatchMutation<a name="line.3171"></a> +<span class="sourceLineNo">3172</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3172"></a> +<span class="sourceLineNo">3173</span> }<a name="line.3173"></a> +<span class="sourceLineNo">3174</span> } else {<a name="line.3174"></a> +<span class="sourceLineNo">3175</span> // In case of passing Append mutations along with the Puts and Deletes in batchMutate<a name="line.3175"></a> +<span class="sourceLineNo">3176</span> // mark the operation return code as failure so that it will not be considered in<a name="line.3176"></a> +<span class="sourceLineNo">3177</span> // the doMiniBatchMutation<a name="line.3177"></a> +<span class="sourceLineNo">3178</span> batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,<a name="line.3178"></a> +<span class="sourceLineNo">3179</span> "Put/Delete mutations only supported in batchMutate() now");<a name="line.3179"></a> +<span class="sourceLineNo">3180</span> }<a name="line.3180"></a> +<span class="sourceLineNo">3181</span> if (!walEdit.isEmpty()) {<a name="line.3181"></a> +<span class="sourceLineNo">3182</span> batchOp.walEditsFromCoprocessors[i] = walEdit;<a name="line.3182"></a> +<span class="sourceLineNo">3183</span> walEdit = new WALEdit();<a name="line.3183"></a> +<span class="sourceLineNo">3184</span> }<a name="line.3184"></a> +<span class="sourceLineNo">3185</span> }<a name="line.3185"></a> +<span class="sourceLineNo">3186</span> }<a name="line.3186"></a> +<span class="sourceLineNo">3187</span> }<a name="line.3187"></a> +<span class="sourceLineNo">3188</span><a name="line.3188"></a> +<span class="sourceLineNo">3189</span> /**<a name="line.3189"></a> +<span class="sourceLineNo">3190</span> * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}<a name="line.3190"></a> +<span class="sourceLineNo">3191</span> * In here we also handle replay of edits on region recover.<a name="line.3191"></a> +<span class="sourceLineNo">3192</span> * @return Change in size brought about by applying <code>batchOp</code><a name="line.3192"></a> +<span class="sourceLineNo">3193</span> */<a name="line.3193"></a> +<span class="sourceLineNo">3194</span> // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120<a name="line.3194"></a> +<span class="sourceLineNo">3195</span> private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {<a name="line.3195"></a> +<span class="sourceLineNo">3196</span> boolean replay = batchOp.isInReplay();<a name="line.3196"></a> +<span class="sourceLineNo">3197</span> long currentNonceGroup = HConstants.NO_NONCE;<a name="line.3197"></a> +<span class="sourceLineNo">3198</span> long currentNonce = HConstants.NO_NONCE;<a name="line.3198"></a> +<span class="sourceLineNo">3199</span> WALEdit walEdit = null;<a name="line.3199"></a> +<span class="sourceLineNo">3200</span> boolean locked = false;<a name="line.3200"></a> +<span class="sourceLineNo">3201</span> // reference family maps directly so coprocessors can mutate them if desired<a name="line.3201"></a> +<span class="sourceLineNo">3202</span> Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];<a name="line.3202"></a> +<span class="sourceLineNo">3203</span> // We try to set up a batch in the range [firstIndex,lastIndexExclusive)<a name="line.3203"></a> +<span class="sourceLineNo">3204</span> int firstIndex = batchOp.nextIndexToProcess;<a name="line.3204"></a> +<span class="sourceLineNo">3205</span> int lastIndexExclusive = firstIndex;<a name="line.3205"></a> +<span class="sourceLineNo">3206</span> boolean success = false;<a name="line.3206"></a> +<span class="sourceLineNo">3207</span> int noOfPuts = 0;<a name="line.3207"></a> +<span class="sourceLineNo">3208</span> int noOfDeletes = 0;<a name="line.3208"></a> +<span class="sourceLineNo">3209</span> WriteEntry writeEntry = null;<a name="line.3209"></a> +<span class="sourceLineNo">3210</span> int cellCount = 0;<a name="line.3210"></a> +<span class="sourceLineNo">3211</span> /** Keep track of the locks we hold so we can release them in finally clause */<a name="line.3211"></a> +<span class="sourceLineNo">3212</span> List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);<a name="line.3212"></a> +<span class="sourceLineNo">3213</span> MemstoreSize memstoreSize = new MemstoreSize();<a name="line.3213"></a> +<span class="sourceLineNo">3214</span> final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch();<a name="line.3214"></a> +<span class="sourceLineNo">3215</span> try {<a name="line.3215"></a> +<span class="sourceLineNo">3216</span> // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.<a name="line.3216"></a> +<span class="sourceLineNo">3217</span> int numReadyToWrite = 0;<a name="line.3217"></a> +<span class="sourceLineNo">3218</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.3218"></a> +<span class="sourceLineNo">3219</span> while (lastIndexExclusive < batchOp.operations.length) {<a name="line.3219"></a> +<span class="sourceLineNo">3220</span> if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) {<a name="line.3220"></a> +<span class="sourceLineNo">3221</span> lastIndexExclusive++;<a name="line.3221"></a> +<span class="sourceLineNo">3222</span> continue;<a name="line.3222"></a> +<span class="sourceLineNo">3223</span> }<a name="line.3223"></a> +<span class="sourceLineNo">3224</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.3224"></a> +<span class="sourceLineNo">3225</span> // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.3225"></a> +<span class="sourceLineNo">3226</span> RowLock rowLock = null;<a name="line.3226"></a> +<span class="sourceLineNo">3227</span> try {<a name="line.3227"></a> +<span class="sourceLineNo">3228</span> rowLock = getRowLockInternal(mutation.getRow(), true);<a name="line.3228"></a> +<span class="sourceLineNo">3229</span> } catch (TimeoutIOException e) {<a name="line.3229"></a> +<span class="sourceLineNo">3230</span> // We will retry when other exceptions, but we should stop if we timeout .<a name="line.3230"></a> +<span class="sourceLineNo">3231</span> throw e;<a name="line.3231"></a> +<span class="sourceLineNo">3232</span> } catch (IOException ioe) {<a name="line.3232"></a> +<span class="sourceLineNo">3233</span> LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3233"></a> +<span class="sourceLineNo">3234</span> }<a name="line.3234"></a> +<span class="sourceLineNo">3235</span> if (rowLock == null) {<a name="line.3235"></a> +<span class="sourceLineNo">3236</span> // We failed to grab another lock<a name="line.3236"></a> +<span class="sourceLineNo">3237</span> break; // Stop acquiring more rows for this batch<a name="line.3237"></a> +<span class="sourceLineNo">3238</span> } else {<a name="line.3238"></a> +<span class="sourceLineNo">3239</span> acquiredRowLocks.add(rowLock);<a name="line.3239"></a> +<span class="sourceLineNo">3240</span> }<a name="line.3240"></a> +<span class="sourceLineNo">3241</span><a name="line.3241"></a> +<span class="sourceLineNo">3242</span> lastIndexExclusive++;<a name="line.3242"></a> +<span class="sourceLineNo">3243</span> numReadyToWrite++;<a name="line.3243"></a> +<span class="sourceLineNo">3244</span> if (replay) {<a name="line.3244"></a> +<span class="sourceLineNo">3245</span> for (List<Cell> cells : mutation.getFamilyCellMap().values()) {<a name="line.3245"></a> +<span class="sourceLineNo">3246</span> cellCount += cells.size();<a name="line.3246"></a> +<span class="sourceLineNo">3247</span> }<a name="line.3247"></a> +<span class="sourceLineNo">3248</span> }<a name="line.3248"></a> +<span class="sourceLineNo">3249</span> }<a name="line.3249"></a> +<span class="sourceLineNo">3250</span><a name="line.3250"></a> +<span class="sourceLineNo">3251</span> // We've now grabbed as many mutations off the list as we can<a name="line.3251"></a> +<span class="sourceLineNo">3252</span><a name="line.3252"></a> +<span class="sourceLineNo">3253</span> // STEP 2. Update any LATEST_TIMESTAMP timestamps<a name="line.3253"></a> +<span class="sourceLineNo">3254</span> // We should record the timestamp only after we have acquired the rowLock,<a name="line.3254"></a> +<span class="sourceLineNo">3255</span> // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp<a name="line.3255"></a> +<span class="sourceLineNo">3256</span> now = EnvironmentEdgeManager.currentTime();<a name="line.3256"></a> +<span class="sourceLineNo">3257</span> byte[] byteNow = Bytes.toBytes(now);<a name="line.3257"></a> +<span class="sourceLineNo">3258</span><a name="line.3258"></a> +<span class="sourceLineNo">3259</span> // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?<a name="line.3259"></a> +<span class="sourceLineNo">3260</span> if (numReadyToWrite <= 0) {<a name="line.3260"></a> +<span class="sourceLineNo">3261</span> return;<a name="line.3261"></a> +<span class="sourceLineNo">3262</span> }<a name="line.3262"></a> +<span class="sourceLineNo">3263</span><a name="line.3263"></a> +<span class="sourceLineNo">3264</span> for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {<a name="line.3264"></a> +<span class="sourceLineNo">3265</span> // skip invalid<a name="line.3265"></a> +<span class="sourceLineNo">3266</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3266"></a> +<span class="sourceLineNo">3267</span> != OperationStatusCode.NOT_RUN) {<a name="line.3267"></a> +<span class="sourceLineNo">3268</span> // lastIndexExclusive was incremented above.<a name="line.3268"></a> +<span class="sourceLineNo">3269</span> continue;<a name="line.3269"></a> +<span class="sourceLineNo">3270</span> }<a name="line.3270"></a> +<span class="sourceLineNo">3271</span><a name="line.3271"></a> +<span class="sourceLineNo">3272</span> Mutation mutation = batchOp.getMutation(i);<a name="line.3272"></a> +<span class="sourceLineNo">3273</span> if (mutation instanceof Put) {<a name="line.3273"></a> +<span class="sourceLineNo">3274</span> updateCellTimestamps(familyMaps[i].values(), byteNow);<a name="line.3274"></a> +<span class="sourceLineNo">3275</span> noOfPuts++;<a name="line.3275"></a> +<span class="sourceLineNo">3276</span> } else {<a name="line.3276"></a> +<span class="sourceLineNo">3277</span> prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);<a name="line.3277"></a> +<span class="sourceLineNo">3278</span> noOfDeletes++;<a name="line.3278"></a> +<span class="sourceLineNo">3279</span> }<a name="line.3279"></a> +<span class="sourceLineNo">3280</span> rewriteCellTags(familyMaps[i], mutation);<a name="line.3280"></a> +<span class="sourceLineNo">3281</span> WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];<a name="line.3281"></a> +<span class="sourceLineNo">3282</
<TRUNCATED>