http://git-wip-us.apache.org/repos/asf/hbase-site/blob/9b29d8f0/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html index 68e9bfe..0f5a095 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html @@ -7971,832 +7971,819 @@ <span class="sourceLineNo">7963</span><a name="line.7963"></a> <span class="sourceLineNo">7964</span> /**<a name="line.7964"></a> <span class="sourceLineNo">7965</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client; these Sets are not<a name="line.7965"></a> -<span class="sourceLineNo">7966</span> * always the same dependent on whether to write WAL or if the amount to increment is zero (in<a name="line.7966"></a> -<span class="sourceLineNo">7967</span> * this case we write back nothing, just return latest Cell value to the client).<a name="line.7967"></a> -<span class="sourceLineNo">7968</span> *<a name="line.7968"></a> -<span class="sourceLineNo">7969</span> * @param results Fill in here what goes back to the Client if it is non-null (if null, client<a name="line.7969"></a> -<span class="sourceLineNo">7970</span> * doesn't want results).<a name="line.7970"></a> -<span class="sourceLineNo">7971</span> * @param forMemStore Fill in here what to apply to the MemStore (by Store).<a name="line.7971"></a> -<span class="sourceLineNo">7972</span> * @return A WALEdit to apply to WAL or null if we are to skip the WAL.<a name="line.7972"></a> -<span class="sourceLineNo">7973</span> */<a name="line.7973"></a> -<span class="sourceLineNo">7974</span> private WALEdit reckonDeltas(Operation op, Mutation mutation, Durability effectiveDurability,<a name="line.7974"></a> -<span class="sourceLineNo">7975</span> Map<HStore, List<Cell>> forMemStore, List<Cell> results) throws IOException {<a name="line.7975"></a> -<span class="sourceLineNo">7976</span> WALEdit walEdit = null;<a name="line.7976"></a> -<span class="sourceLineNo">7977</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7977"></a> -<span class="sourceLineNo">7978</span> final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;<a name="line.7978"></a> -<span class="sourceLineNo">7979</span> // Process a Store/family at a time.<a name="line.7979"></a> -<span class="sourceLineNo">7980</span> for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {<a name="line.7980"></a> -<span class="sourceLineNo">7981</span> final byte[] columnFamilyName = entry.getKey();<a name="line.7981"></a> -<span class="sourceLineNo">7982</span> List<Cell> deltas = entry.getValue();<a name="line.7982"></a> -<span class="sourceLineNo">7983</span> HStore store = this.stores.get(columnFamilyName);<a name="line.7983"></a> -<span class="sourceLineNo">7984</span> // Reckon for the Store what to apply to WAL and MemStore.<a name="line.7984"></a> -<span class="sourceLineNo">7985</span> List<Cell> toApply =<a name="line.7985"></a> -<span class="sourceLineNo">7986</span> reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);<a name="line.7986"></a> -<span class="sourceLineNo">7987</span> if (!toApply.isEmpty()) {<a name="line.7987"></a> -<span class="sourceLineNo">7988</span> forMemStore.put(store, toApply);<a name="line.7988"></a> -<span class="sourceLineNo">7989</span> if (writeToWAL) {<a name="line.7989"></a> -<span class="sourceLineNo">7990</span> if (walEdit == null) {<a name="line.7990"></a> -<span class="sourceLineNo">7991</span> walEdit = new WALEdit();<a name="line.7991"></a> -<span class="sourceLineNo">7992</span> }<a name="line.7992"></a> -<span class="sourceLineNo">7993</span> walEdit.getCells().addAll(toApply);<a name="line.7993"></a> -<span class="sourceLineNo">7994</span> }<a name="line.7994"></a> -<span class="sourceLineNo">7995</span> }<a name="line.7995"></a> -<span class="sourceLineNo">7996</span> }<a name="line.7996"></a> -<span class="sourceLineNo">7997</span> return walEdit;<a name="line.7997"></a> -<span class="sourceLineNo">7998</span> }<a name="line.7998"></a> -<span class="sourceLineNo">7999</span><a name="line.7999"></a> -<span class="sourceLineNo">8000</span> /**<a name="line.8000"></a> -<span class="sourceLineNo">8001</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed<a name="line.8001"></a> -<span class="sourceLineNo">8002</span> * column family/Store.<a name="line.8002"></a> -<span class="sourceLineNo">8003</span> *<a name="line.8003"></a> -<span class="sourceLineNo">8004</span> * Does Get of current value and then adds passed in deltas for this Store returning the result.<a name="line.8004"></a> -<span class="sourceLineNo">8005</span> *<a name="line.8005"></a> -<span class="sourceLineNo">8006</span> * @param op Whether Increment or Append<a name="line.8006"></a> -<span class="sourceLineNo">8007</span> * @param mutation The encompassing Mutation object<a name="line.8007"></a> -<span class="sourceLineNo">8008</span> * @param deltas Changes to apply to this Store; either increment amount or data to append<a name="line.8008"></a> -<span class="sourceLineNo">8009</span> * @param results In here we accumulate all the Cells we are to return to the client; this List<a name="line.8009"></a> -<span class="sourceLineNo">8010</span> * can be larger than what we return in case where delta is zero; i.e. don't write<a name="line.8010"></a> -<span class="sourceLineNo">8011</span> * out new values, just return current value. If null, client doesn't want results returned.<a name="line.8011"></a> -<span class="sourceLineNo">8012</span> * @return Resulting Cells after <code>deltas</code> have been applied to current<a name="line.8012"></a> -<span class="sourceLineNo">8013</span> * values. Side effect is our filling out of the <code>results</code> List.<a name="line.8013"></a> -<span class="sourceLineNo">8014</span> */<a name="line.8014"></a> -<span class="sourceLineNo">8015</span> private List<Cell> reckonDeltasByStore(HStore store, Operation op, Mutation mutation,<a name="line.8015"></a> -<span class="sourceLineNo">8016</span> Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)<a name="line.8016"></a> -<span class="sourceLineNo">8017</span> throws IOException {<a name="line.8017"></a> -<span class="sourceLineNo">8018</span> byte[] columnFamily = store.getColumnFamilyDescriptor().getName();<a name="line.8018"></a> -<span class="sourceLineNo">8019</span> List<Cell> toApply = new ArrayList<>(deltas.size());<a name="line.8019"></a> -<span class="sourceLineNo">8020</span> // Get previous values for all columns in this family.<a name="line.8020"></a> -<span class="sourceLineNo">8021</span> TimeRange tr = null;<a name="line.8021"></a> -<span class="sourceLineNo">8022</span> switch (op) {<a name="line.8022"></a> -<span class="sourceLineNo">8023</span> case INCREMENT:<a name="line.8023"></a> -<span class="sourceLineNo">8024</span> tr = ((Increment)mutation).getTimeRange();<a name="line.8024"></a> -<span class="sourceLineNo">8025</span> break;<a name="line.8025"></a> -<span class="sourceLineNo">8026</span> case APPEND:<a name="line.8026"></a> -<span class="sourceLineNo">8027</span> tr = ((Append)mutation).getTimeRange();<a name="line.8027"></a> +<span class="sourceLineNo">7966</span> * always the same dependent on whether to write WAL.<a name="line.7966"></a> +<span class="sourceLineNo">7967</span> *<a name="line.7967"></a> +<span class="sourceLineNo">7968</span> * @param results Fill in here what goes back to the Client if it is non-null (if null, client<a name="line.7968"></a> +<span class="sourceLineNo">7969</span> * doesn't want results).<a name="line.7969"></a> +<span class="sourceLineNo">7970</span> * @param forMemStore Fill in here what to apply to the MemStore (by Store).<a name="line.7970"></a> +<span class="sourceLineNo">7971</span> * @return A WALEdit to apply to WAL or null if we are to skip the WAL.<a name="line.7971"></a> +<span class="sourceLineNo">7972</span> */<a name="line.7972"></a> +<span class="sourceLineNo">7973</span> private WALEdit reckonDeltas(Operation op, Mutation mutation, Durability effectiveDurability,<a name="line.7973"></a> +<span class="sourceLineNo">7974</span> Map<HStore, List<Cell>> forMemStore, List<Cell> results) throws IOException {<a name="line.7974"></a> +<span class="sourceLineNo">7975</span> WALEdit walEdit = null;<a name="line.7975"></a> +<span class="sourceLineNo">7976</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7976"></a> +<span class="sourceLineNo">7977</span> final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;<a name="line.7977"></a> +<span class="sourceLineNo">7978</span> // Process a Store/family at a time.<a name="line.7978"></a> +<span class="sourceLineNo">7979</span> for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {<a name="line.7979"></a> +<span class="sourceLineNo">7980</span> final byte[] columnFamilyName = entry.getKey();<a name="line.7980"></a> +<span class="sourceLineNo">7981</span> List<Cell> deltas = entry.getValue();<a name="line.7981"></a> +<span class="sourceLineNo">7982</span> HStore store = this.stores.get(columnFamilyName);<a name="line.7982"></a> +<span class="sourceLineNo">7983</span> // Reckon for the Store what to apply to WAL and MemStore.<a name="line.7983"></a> +<span class="sourceLineNo">7984</span> List<Cell> toApply =<a name="line.7984"></a> +<span class="sourceLineNo">7985</span> reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);<a name="line.7985"></a> +<span class="sourceLineNo">7986</span> if (!toApply.isEmpty()) {<a name="line.7986"></a> +<span class="sourceLineNo">7987</span> forMemStore.put(store, toApply);<a name="line.7987"></a> +<span class="sourceLineNo">7988</span> if (writeToWAL) {<a name="line.7988"></a> +<span class="sourceLineNo">7989</span> if (walEdit == null) {<a name="line.7989"></a> +<span class="sourceLineNo">7990</span> walEdit = new WALEdit();<a name="line.7990"></a> +<span class="sourceLineNo">7991</span> }<a name="line.7991"></a> +<span class="sourceLineNo">7992</span> walEdit.getCells().addAll(toApply);<a name="line.7992"></a> +<span class="sourceLineNo">7993</span> }<a name="line.7993"></a> +<span class="sourceLineNo">7994</span> }<a name="line.7994"></a> +<span class="sourceLineNo">7995</span> }<a name="line.7995"></a> +<span class="sourceLineNo">7996</span> return walEdit;<a name="line.7996"></a> +<span class="sourceLineNo">7997</span> }<a name="line.7997"></a> +<span class="sourceLineNo">7998</span><a name="line.7998"></a> +<span class="sourceLineNo">7999</span> /**<a name="line.7999"></a> +<span class="sourceLineNo">8000</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed<a name="line.8000"></a> +<span class="sourceLineNo">8001</span> * column family/Store.<a name="line.8001"></a> +<span class="sourceLineNo">8002</span> *<a name="line.8002"></a> +<span class="sourceLineNo">8003</span> * Does Get of current value and then adds passed in deltas for this Store returning the result.<a name="line.8003"></a> +<span class="sourceLineNo">8004</span> *<a name="line.8004"></a> +<span class="sourceLineNo">8005</span> * @param op Whether Increment or Append<a name="line.8005"></a> +<span class="sourceLineNo">8006</span> * @param mutation The encompassing Mutation object<a name="line.8006"></a> +<span class="sourceLineNo">8007</span> * @param deltas Changes to apply to this Store; either increment amount or data to append<a name="line.8007"></a> +<span class="sourceLineNo">8008</span> * @param results In here we accumulate all the Cells we are to return to the client. If null,<a name="line.8008"></a> +<span class="sourceLineNo">8009</span> * client doesn't want results returned.<a name="line.8009"></a> +<span class="sourceLineNo">8010</span> * @return Resulting Cells after <code>deltas</code> have been applied to current<a name="line.8010"></a> +<span class="sourceLineNo">8011</span> * values. Side effect is our filling out of the <code>results</code> List.<a name="line.8011"></a> +<span class="sourceLineNo">8012</span> */<a name="line.8012"></a> +<span class="sourceLineNo">8013</span> private List<Cell> reckonDeltasByStore(HStore store, Operation op, Mutation mutation,<a name="line.8013"></a> +<span class="sourceLineNo">8014</span> Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)<a name="line.8014"></a> +<span class="sourceLineNo">8015</span> throws IOException {<a name="line.8015"></a> +<span class="sourceLineNo">8016</span> byte[] columnFamily = store.getColumnFamilyDescriptor().getName();<a name="line.8016"></a> +<span class="sourceLineNo">8017</span> List<Cell> toApply = new ArrayList<>(deltas.size());<a name="line.8017"></a> +<span class="sourceLineNo">8018</span> // Get previous values for all columns in this family.<a name="line.8018"></a> +<span class="sourceLineNo">8019</span> TimeRange tr = null;<a name="line.8019"></a> +<span class="sourceLineNo">8020</span> switch (op) {<a name="line.8020"></a> +<span class="sourceLineNo">8021</span> case INCREMENT:<a name="line.8021"></a> +<span class="sourceLineNo">8022</span> tr = ((Increment)mutation).getTimeRange();<a name="line.8022"></a> +<span class="sourceLineNo">8023</span> break;<a name="line.8023"></a> +<span class="sourceLineNo">8024</span> case APPEND:<a name="line.8024"></a> +<span class="sourceLineNo">8025</span> tr = ((Append)mutation).getTimeRange();<a name="line.8025"></a> +<span class="sourceLineNo">8026</span> break;<a name="line.8026"></a> +<span class="sourceLineNo">8027</span> default:<a name="line.8027"></a> <span class="sourceLineNo">8028</span> break;<a name="line.8028"></a> -<span class="sourceLineNo">8029</span> default:<a name="line.8029"></a> -<span class="sourceLineNo">8030</span> break;<a name="line.8030"></a> -<span class="sourceLineNo">8031</span> }<a name="line.8031"></a> -<span class="sourceLineNo">8032</span> List<Cell> currentValues = get(mutation, store, deltas,null, tr);<a name="line.8032"></a> -<span class="sourceLineNo">8033</span> // Iterate the input columns and update existing values if they were found, otherwise<a name="line.8033"></a> -<span class="sourceLineNo">8034</span> // add new column initialized to the delta amount<a name="line.8034"></a> -<span class="sourceLineNo">8035</span> int currentValuesIndex = 0;<a name="line.8035"></a> -<span class="sourceLineNo">8036</span> for (int i = 0; i < deltas.size(); i++) {<a name="line.8036"></a> -<span class="sourceLineNo">8037</span> Cell delta = deltas.get(i);<a name="line.8037"></a> -<span class="sourceLineNo">8038</span> Cell currentValue = null;<a name="line.8038"></a> -<span class="sourceLineNo">8039</span> boolean firstWrite = false;<a name="line.8039"></a> -<span class="sourceLineNo">8040</span> if (currentValuesIndex < currentValues.size() &&<a name="line.8040"></a> -<span class="sourceLineNo">8041</span> CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {<a name="line.8041"></a> -<span class="sourceLineNo">8042</span> currentValue = currentValues.get(currentValuesIndex);<a name="line.8042"></a> -<span class="sourceLineNo">8043</span> if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {<a name="line.8043"></a> -<span class="sourceLineNo">8044</span> currentValuesIndex++;<a name="line.8044"></a> -<span class="sourceLineNo">8045</span> }<a name="line.8045"></a> -<span class="sourceLineNo">8046</span> } else {<a name="line.8046"></a> -<span class="sourceLineNo">8047</span> firstWrite = true;<a name="line.8047"></a> -<span class="sourceLineNo">8048</span> }<a name="line.8048"></a> -<span class="sourceLineNo">8049</span> // Switch on whether this an increment or an append building the new Cell to apply.<a name="line.8049"></a> -<span class="sourceLineNo">8050</span> Cell newCell = null;<a name="line.8050"></a> -<span class="sourceLineNo">8051</span> MutationType mutationType = null;<a name="line.8051"></a> -<span class="sourceLineNo">8052</span> boolean apply = true;<a name="line.8052"></a> -<span class="sourceLineNo">8053</span> switch (op) {<a name="line.8053"></a> -<span class="sourceLineNo">8054</span> case INCREMENT:<a name="line.8054"></a> -<span class="sourceLineNo">8055</span> mutationType = MutationType.INCREMENT;<a name="line.8055"></a> -<span class="sourceLineNo">8056</span> // If delta amount to apply is 0, don't write WAL or MemStore.<a name="line.8056"></a> -<span class="sourceLineNo">8057</span> long deltaAmount = getLongValue(delta);<a name="line.8057"></a> -<span class="sourceLineNo">8058</span> // TODO: Does zero value mean reset Cell? For example, the ttl.<a name="line.8058"></a> -<span class="sourceLineNo">8059</span> apply = deltaAmount != 0;<a name="line.8059"></a> -<span class="sourceLineNo">8060</span> final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;<a name="line.8060"></a> -<span class="sourceLineNo">8061</span> newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));<a name="line.8061"></a> +<span class="sourceLineNo">8029</span> }<a name="line.8029"></a> +<span class="sourceLineNo">8030</span> List<Cell> currentValues = get(mutation, store, deltas,null, tr);<a name="line.8030"></a> +<span class="sourceLineNo">8031</span> // Iterate the input columns and update existing values if they were found, otherwise<a name="line.8031"></a> +<span class="sourceLineNo">8032</span> // add new column initialized to the delta amount<a name="line.8032"></a> +<span class="sourceLineNo">8033</span> int currentValuesIndex = 0;<a name="line.8033"></a> +<span class="sourceLineNo">8034</span> for (int i = 0; i < deltas.size(); i++) {<a name="line.8034"></a> +<span class="sourceLineNo">8035</span> Cell delta = deltas.get(i);<a name="line.8035"></a> +<span class="sourceLineNo">8036</span> Cell currentValue = null;<a name="line.8036"></a> +<span class="sourceLineNo">8037</span> if (currentValuesIndex < currentValues.size() &&<a name="line.8037"></a> +<span class="sourceLineNo">8038</span> CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {<a name="line.8038"></a> +<span class="sourceLineNo">8039</span> currentValue = currentValues.get(currentValuesIndex);<a name="line.8039"></a> +<span class="sourceLineNo">8040</span> if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {<a name="line.8040"></a> +<span class="sourceLineNo">8041</span> currentValuesIndex++;<a name="line.8041"></a> +<span class="sourceLineNo">8042</span> }<a name="line.8042"></a> +<span class="sourceLineNo">8043</span> }<a name="line.8043"></a> +<span class="sourceLineNo">8044</span> // Switch on whether this an increment or an append building the new Cell to apply.<a name="line.8044"></a> +<span class="sourceLineNo">8045</span> Cell newCell = null;<a name="line.8045"></a> +<span class="sourceLineNo">8046</span> MutationType mutationType = null;<a name="line.8046"></a> +<span class="sourceLineNo">8047</span> switch (op) {<a name="line.8047"></a> +<span class="sourceLineNo">8048</span> case INCREMENT:<a name="line.8048"></a> +<span class="sourceLineNo">8049</span> mutationType = MutationType.INCREMENT;<a name="line.8049"></a> +<span class="sourceLineNo">8050</span> long deltaAmount = getLongValue(delta);<a name="line.8050"></a> +<span class="sourceLineNo">8051</span> final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;<a name="line.8051"></a> +<span class="sourceLineNo">8052</span> newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));<a name="line.8052"></a> +<span class="sourceLineNo">8053</span> break;<a name="line.8053"></a> +<span class="sourceLineNo">8054</span> case APPEND:<a name="line.8054"></a> +<span class="sourceLineNo">8055</span> mutationType = MutationType.APPEND;<a name="line.8055"></a> +<span class="sourceLineNo">8056</span> newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -><a name="line.8056"></a> +<span class="sourceLineNo">8057</span> ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])<a name="line.8057"></a> +<span class="sourceLineNo">8058</span> .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())<a name="line.8058"></a> +<span class="sourceLineNo">8059</span> .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())<a name="line.8059"></a> +<span class="sourceLineNo">8060</span> .array()<a name="line.8060"></a> +<span class="sourceLineNo">8061</span> );<a name="line.8061"></a> <span class="sourceLineNo">8062</span> break;<a name="line.8062"></a> -<span class="sourceLineNo">8063</span> case APPEND:<a name="line.8063"></a> -<span class="sourceLineNo">8064</span> mutationType = MutationType.APPEND;<a name="line.8064"></a> -<span class="sourceLineNo">8065</span> // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to.<a name="line.8065"></a> -<span class="sourceLineNo">8066</span> newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -><a name="line.8066"></a> -<span class="sourceLineNo">8067</span> ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])<a name="line.8067"></a> -<span class="sourceLineNo">8068</span> .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())<a name="line.8068"></a> -<span class="sourceLineNo">8069</span> .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())<a name="line.8069"></a> -<span class="sourceLineNo">8070</span> .array()<a name="line.8070"></a> -<span class="sourceLineNo">8071</span> );<a name="line.8071"></a> -<span class="sourceLineNo">8072</span> break;<a name="line.8072"></a> -<span class="sourceLineNo">8073</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.8073"></a> -<span class="sourceLineNo">8074</span> }<a name="line.8074"></a> -<span class="sourceLineNo">8075</span><a name="line.8075"></a> -<span class="sourceLineNo">8076</span> // Give coprocessors a chance to update the new cell<a name="line.8076"></a> -<span class="sourceLineNo">8077</span> if (coprocessorHost != null) {<a name="line.8077"></a> -<span class="sourceLineNo">8078</span> newCell =<a name="line.8078"></a> -<span class="sourceLineNo">8079</span> coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);<a name="line.8079"></a> -<span class="sourceLineNo">8080</span> }<a name="line.8080"></a> -<span class="sourceLineNo">8081</span> // If apply, we need to update memstore/WAL with new value; add it toApply.<a name="line.8081"></a> -<span class="sourceLineNo">8082</span> if (apply || firstWrite) {<a name="line.8082"></a> -<span class="sourceLineNo">8083</span> toApply.add(newCell);<a name="line.8083"></a> -<span class="sourceLineNo">8084</span> }<a name="line.8084"></a> -<span class="sourceLineNo">8085</span> // Add to results to get returned to the Client. If null, cilent does not want results.<a name="line.8085"></a> -<span class="sourceLineNo">8086</span> if (results != null) {<a name="line.8086"></a> -<span class="sourceLineNo">8087</span> results.add(newCell);<a name="line.8087"></a> -<span class="sourceLineNo">8088</span> }<a name="line.8088"></a> -<span class="sourceLineNo">8089</span> }<a name="line.8089"></a> -<span class="sourceLineNo">8090</span> return toApply;<a name="line.8090"></a> -<span class="sourceLineNo">8091</span> }<a name="line.8091"></a> -<span class="sourceLineNo">8092</span><a name="line.8092"></a> -<span class="sourceLineNo">8093</span> private static Cell reckonDelta(final Cell delta, final Cell currentCell,<a name="line.8093"></a> -<span class="sourceLineNo">8094</span> final byte[] columnFamily, final long now,<a name="line.8094"></a> -<span class="sourceLineNo">8095</span> Mutation mutation, Function<Cell, byte[]> supplier) throws IOException {<a name="line.8095"></a> -<span class="sourceLineNo">8096</span> // Forward any tags found on the delta.<a name="line.8096"></a> -<span class="sourceLineNo">8097</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.8097"></a> -<span class="sourceLineNo">8098</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.8098"></a> -<span class="sourceLineNo">8099</span> if (currentCell != null) {<a name="line.8099"></a> -<span class="sourceLineNo">8100</span> tags = TagUtil.carryForwardTags(tags, currentCell);<a name="line.8100"></a> -<span class="sourceLineNo">8101</span> byte[] newValue = supplier.apply(currentCell);<a name="line.8101"></a> -<span class="sourceLineNo">8102</span> return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)<a name="line.8102"></a> -<span class="sourceLineNo">8103</span> .setRow(mutation.getRow(), 0, mutation.getRow().length)<a name="line.8103"></a> -<span class="sourceLineNo">8104</span> .setFamily(columnFamily, 0, columnFamily.length)<a name="line.8104"></a> -<span class="sourceLineNo">8105</span> // copy the qualifier if the cell is located in shared memory.<a name="line.8105"></a> -<span class="sourceLineNo">8106</span> .setQualifier(CellUtil.cloneQualifier(delta))<a name="line.8106"></a> -<span class="sourceLineNo">8107</span> .setTimestamp(Math.max(currentCell.getTimestamp() + 1, now))<a name="line.8107"></a> -<span class="sourceLineNo">8108</span> .setType(KeyValue.Type.Put.getCode())<a name="line.8108"></a> -<span class="sourceLineNo">8109</span> .setValue(newValue, 0, newValue.length)<a name="line.8109"></a> -<span class="sourceLineNo">8110</span> .setTags(TagUtil.fromList(tags))<a name="line.8110"></a> -<span class="sourceLineNo">8111</span> .build();<a name="line.8111"></a> -<span class="sourceLineNo">8112</span> } else {<a name="line.8112"></a> -<span class="sourceLineNo">8113</span> PrivateCellUtil.updateLatestStamp(delta, now);<a name="line.8113"></a> -<span class="sourceLineNo">8114</span> return CollectionUtils.isEmpty(tags) ? delta : PrivateCellUtil.createCell(delta, tags);<a name="line.8114"></a> -<span class="sourceLineNo">8115</span> }<a name="line.8115"></a> -<span class="sourceLineNo">8116</span> }<a name="line.8116"></a> -<span class="sourceLineNo">8117</span><a name="line.8117"></a> -<span class="sourceLineNo">8118</span> /**<a name="line.8118"></a> -<span class="sourceLineNo">8119</span> * @return Get the long out of the passed in Cell<a name="line.8119"></a> -<span class="sourceLineNo">8120</span> */<a name="line.8120"></a> -<span class="sourceLineNo">8121</span> private static long getLongValue(final Cell cell) throws DoNotRetryIOException {<a name="line.8121"></a> -<span class="sourceLineNo">8122</span> int len = cell.getValueLength();<a name="line.8122"></a> -<span class="sourceLineNo">8123</span> if (len != Bytes.SIZEOF_LONG) {<a name="line.8123"></a> -<span class="sourceLineNo">8124</span> // throw DoNotRetryIOException instead of IllegalArgumentException<a name="line.8124"></a> -<span class="sourceLineNo">8125</span> throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");<a name="line.8125"></a> -<span class="sourceLineNo">8126</span> }<a name="line.8126"></a> -<span class="sourceLineNo">8127</span> return PrivateCellUtil.getValueAsLong(cell);<a name="line.8127"></a> -<span class="sourceLineNo">8128</span> }<a name="line.8128"></a> -<span class="sourceLineNo">8129</span><a name="line.8129"></a> -<span class="sourceLineNo">8130</span> /**<a name="line.8130"></a> -<span class="sourceLineNo">8131</span> * Do a specific Get on passed <code>columnFamily</code> and column qualifiers.<a name="line.8131"></a> -<span class="sourceLineNo">8132</span> * @param mutation Mutation we are doing this Get for.<a name="line.8132"></a> -<span class="sourceLineNo">8133</span> * @param store Which column family on row (TODO: Go all Gets in one go)<a name="line.8133"></a> -<span class="sourceLineNo">8134</span> * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.<a name="line.8134"></a> -<span class="sourceLineNo">8135</span> * @return Return list of Cells found.<a name="line.8135"></a> -<span class="sourceLineNo">8136</span> */<a name="line.8136"></a> -<span class="sourceLineNo">8137</span> private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,<a name="line.8137"></a> -<span class="sourceLineNo">8138</span> IsolationLevel isolation, TimeRange tr) throws IOException {<a name="line.8138"></a> -<span class="sourceLineNo">8139</span> // Sort the cells so that they match the order that they appear in the Get results. Otherwise,<a name="line.8139"></a> -<span class="sourceLineNo">8140</span> // we won't be able to find the existing values if the cells are not specified in order by the<a name="line.8140"></a> -<span class="sourceLineNo">8141</span> // client since cells are in an array list.<a name="line.8141"></a> -<span class="sourceLineNo">8142</span> // TODO: I don't get why we are sorting. St.Ack 20150107<a name="line.8142"></a> -<span class="sourceLineNo">8143</span> sort(coordinates, store.getComparator());<a name="line.8143"></a> -<span class="sourceLineNo">8144</span> Get get = new Get(mutation.getRow());<a name="line.8144"></a> -<span class="sourceLineNo">8145</span> if (isolation != null) {<a name="line.8145"></a> -<span class="sourceLineNo">8146</span> get.setIsolationLevel(isolation);<a name="line.8146"></a> -<span class="sourceLineNo">8147</span> }<a name="line.8147"></a> -<span class="sourceLineNo">8148</span> for (Cell cell: coordinates) {<a name="line.8148"></a> -<span class="sourceLineNo">8149</span> get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell));<a name="line.8149"></a> -<span class="sourceLineNo">8150</span> }<a name="line.8150"></a> -<span class="sourceLineNo">8151</span> // Increments carry time range. If an Increment instance, put it on the Get.<a name="line.8151"></a> -<span class="sourceLineNo">8152</span> if (tr != null) {<a name="line.8152"></a> -<span class="sourceLineNo">8153</span> get.setTimeRange(tr.getMin(), tr.getMax());<a name="line.8153"></a> -<span class="sourceLineNo">8154</span> }<a name="line.8154"></a> -<span class="sourceLineNo">8155</span> return get(get, false);<a name="line.8155"></a> -<span class="sourceLineNo">8156</span> }<a name="line.8156"></a> -<span class="sourceLineNo">8157</span><a name="line.8157"></a> -<span class="sourceLineNo">8158</span> /**<a name="line.8158"></a> -<span class="sourceLineNo">8159</span> * @return Sorted list of <code>cells</code> using <code>comparator</code><a name="line.8159"></a> -<span class="sourceLineNo">8160</span> */<a name="line.8160"></a> -<span class="sourceLineNo">8161</span> private static List<Cell> sort(List<Cell> cells, final CellComparator comparator) {<a name="line.8161"></a> -<span class="sourceLineNo">8162</span> cells.sort(comparator);<a name="line.8162"></a> -<span class="sourceLineNo">8163</span> return cells;<a name="line.8163"></a> -<span class="sourceLineNo">8164</span> }<a name="line.8164"></a> -<span class="sourceLineNo">8165</span><a name="line.8165"></a> -<span class="sourceLineNo">8166</span> public static final long FIXED_OVERHEAD = ClassSize.align(<a name="line.8166"></a> -<span class="sourceLineNo">8167</span> ClassSize.OBJECT +<a name="line.8167"></a> -<span class="sourceLineNo">8168</span> ClassSize.ARRAY +<a name="line.8168"></a> -<span class="sourceLineNo">8169</span> 55 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +<a name="line.8169"></a> -<span class="sourceLineNo">8170</span> (15 * Bytes.SIZEOF_LONG) +<a name="line.8170"></a> -<span class="sourceLineNo">8171</span> 3 * Bytes.SIZEOF_BOOLEAN);<a name="line.8171"></a> -<span class="sourceLineNo">8172</span><a name="line.8172"></a> -<span class="sourceLineNo">8173</span> // woefully out of date - currently missing:<a name="line.8173"></a> -<span class="sourceLineNo">8174</span> // 1 x HashMap - coprocessorServiceHandlers<a name="line.8174"></a> -<span class="sourceLineNo">8175</span> // 6 x LongAdder - numMutationsWithoutWAL, dataInMemoryWithoutWAL,<a name="line.8175"></a> -<span class="sourceLineNo">8176</span> // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,<a name="line.8176"></a> -<span class="sourceLineNo">8177</span> // writeRequestsCount, cpRequestsCount<a name="line.8177"></a> -<span class="sourceLineNo">8178</span> // 1 x HRegion$WriteState - writestate<a name="line.8178"></a> -<span class="sourceLineNo">8179</span> // 1 x RegionCoprocessorHost - coprocessorHost<a name="line.8179"></a> -<span class="sourceLineNo">8180</span> // 1 x RegionSplitPolicy - splitPolicy<a name="line.8180"></a> -<span class="sourceLineNo">8181</span> // 1 x MetricsRegion - metricsRegion<a name="line.8181"></a> -<span class="sourceLineNo">8182</span> // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper<a name="line.8182"></a> -<span class="sourceLineNo">8183</span> public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +<a name="line.8183"></a> -<span class="sourceLineNo">8184</span> ClassSize.OBJECT + // closeLock<a name="line.8184"></a> -<span class="sourceLineNo">8185</span> (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing<a name="line.8185"></a> -<span class="sourceLineNo">8186</span> (3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL,<a name="line.8186"></a> -<span class="sourceLineNo">8187</span> // compactionsFailed<a name="line.8187"></a> -<span class="sourceLineNo">8188</span> (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints<a name="line.8188"></a> -<span class="sourceLineNo">8189</span> WriteState.HEAP_SIZE + // writestate<a name="line.8189"></a> -<span class="sourceLineNo">8190</span> ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores<a name="line.8190"></a> -<span class="sourceLineNo">8191</span> (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock<a name="line.8191"></a> -<span class="sourceLineNo">8192</span> MultiVersionConcurrencyControl.FIXED_SIZE // mvcc<a name="line.8192"></a> -<span class="sourceLineNo">8193</span> + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes<a name="line.8193"></a> -<span class="sourceLineNo">8194</span> + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress<a name="line.8194"></a> -<span class="sourceLineNo">8195</span> + ClassSize.STORE_SERVICES // store services<a name="line.8195"></a> -<span class="sourceLineNo">8196</span> + StoreHotnessProtector.FIXED_SIZE<a name="line.8196"></a> -<span class="sourceLineNo">8197</span> ;<a name="line.8197"></a> -<span class="sourceLineNo">8198</span><a name="line.8198"></a> -<span class="sourceLineNo">8199</span> @Override<a name="line.8199"></a> -<span class="sourceLineNo">8200</span> public long heapSize() {<a name="line.8200"></a> -<span class="sourceLineNo">8201</span> // this does not take into account row locks, recent flushes, mvcc entries, and more<a name="line.8201"></a> -<span class="sourceLineNo">8202</span> return DEEP_OVERHEAD + stores.values().stream().mapToLong(HStore::heapSize).sum();<a name="line.8202"></a> -<span class="sourceLineNo">8203</span> }<a name="line.8203"></a> -<span class="sourceLineNo">8204</span><a name="line.8204"></a> -<span class="sourceLineNo">8205</span> /**<a name="line.8205"></a> -<span class="sourceLineNo">8206</span> * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to<a name="line.8206"></a> -<span class="sourceLineNo">8207</span> * be available for handling Region#execService(com.google.protobuf.RpcController,<a name="line.8207"></a> -<span class="sourceLineNo">8208</span> * org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall) calls.<a name="line.8208"></a> -<span class="sourceLineNo">8209</span> *<a name="line.8209"></a> -<span class="sourceLineNo">8210</span> * <p><a name="line.8210"></a> -<span class="sourceLineNo">8211</span> * Only a single instance may be registered per region for a given {@link Service} subclass (the<a name="line.8211"></a> -<span class="sourceLineNo">8212</span> * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.<a name="line.8212"></a> -<span class="sourceLineNo">8213</span> * After the first registration, subsequent calls with the same service name will fail with<a name="line.8213"></a> -<span class="sourceLineNo">8214</span> * a return value of {@code false}.<a name="line.8214"></a> -<span class="sourceLineNo">8215</span> * </p><a name="line.8215"></a> -<span class="sourceLineNo">8216</span> * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint<a name="line.8216"></a> -<span class="sourceLineNo">8217</span> * @return {@code true} if the registration was successful, {@code false}<a name="line.8217"></a> -<span class="sourceLineNo">8218</span> * otherwise<a name="line.8218"></a> -<span class="sourceLineNo">8219</span> */<a name="line.8219"></a> -<span class="sourceLineNo">8220</span> public boolean registerService(com.google.protobuf.Service instance) {<a name="line.8220"></a> -<span class="sourceLineNo">8221</span> /*<a name="line.8221"></a> -<span class="sourceLineNo">8222</span> * No stacking of instances is allowed for a single service name<a name="line.8222"></a> -<span class="sourceLineNo">8223</span> */<a name="line.8223"></a> -<span class="sourceLineNo">8224</span> com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();<a name="line.8224"></a> -<span class="sourceLineNo">8225</span> String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);<a name="line.8225"></a> -<span class="sourceLineNo">8226</span> if (coprocessorServiceHandlers.containsKey(serviceName)) {<a name="line.8226"></a> -<span class="sourceLineNo">8227</span> LOG.error("Coprocessor service " + serviceName +<a name="line.8227"></a> -<span class="sourceLineNo">8228</span> " already registered, rejecting request from " + instance);<a name="line.8228"></a> -<span class="sourceLineNo">8229</span> return false;<a name="line.8229"></a> -<span class="sourceLineNo">8230</span> }<a name="line.8230"></a> -<span class="sourceLineNo">8231</span><a name="line.8231"></a> -<span class="sourceLineNo">8232</span> coprocessorServiceHandlers.put(serviceName, instance);<a name="line.8232"></a> -<span class="sourceLineNo">8233</span> if (LOG.isDebugEnabled()) {<a name="line.8233"></a> -<span class="sourceLineNo">8234</span> LOG.debug("Registered coprocessor service: region=" +<a name="line.8234"></a> -<span class="sourceLineNo">8235</span> Bytes.toStringBinary(getRegionInfo().getRegionName()) +<a name="line.8235"></a> -<span class="sourceLineNo">8236</span> " service=" + serviceName);<a name="line.8236"></a> -<span class="sourceLineNo">8237</span> }<a name="line.8237"></a> -<span class="sourceLineNo">8238</span> return true;<a name="line.8238"></a> -<span class="sourceLineNo">8239</span> }<a name="line.8239"></a> -<span class="sourceLineNo">8240</span><a name="line.8240"></a> -<span class="sourceLineNo">8241</span> /**<a name="line.8241"></a> -<span class="sourceLineNo">8242</span> * Executes a single protocol buffer coprocessor endpoint {@link Service} method using<a name="line.8242"></a> -<span class="sourceLineNo">8243</span> * the registered protocol handlers. {@link Service} implementations must be registered via the<a name="line.8243"></a> -<span class="sourceLineNo">8244</span> * {@link #registerService(com.google.protobuf.Service)}<a name="line.8244"></a> -<span class="sourceLineNo">8245</span> * method before they are available.<a name="line.8245"></a> -<span class="sourceLineNo">8246</span> *<a name="line.8246"></a> -<span class="sourceLineNo">8247</span> * @param controller an {@code RpcContoller} implementation to pass to the invoked service<a name="line.8247"></a> -<span class="sourceLineNo">8248</span> * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,<a name="line.8248"></a> -<span class="sourceLineNo">8249</span> * and parameters for the method invocation<a name="line.8249"></a> -<span class="sourceLineNo">8250</span> * @return a protocol buffer {@code Message} instance containing the method's result<a name="line.8250"></a> -<span class="sourceLineNo">8251</span> * @throws IOException if no registered service handler is found or an error<a name="line.8251"></a> -<span class="sourceLineNo">8252</span> * occurs during the invocation<a name="line.8252"></a> -<span class="sourceLineNo">8253</span> * @see #registerService(com.google.protobuf.Service)<a name="line.8253"></a> -<span class="sourceLineNo">8254</span> */<a name="line.8254"></a> -<span class="sourceLineNo">8255</span> public com.google.protobuf.Message execService(com.google.protobuf.RpcController controller,<a name="line.8255"></a> -<span class="sourceLineNo">8256</span> CoprocessorServiceCall call) throws IOException {<a name="line.8256"></a> -<span class="sourceLineNo">8257</span> String serviceName = call.getServiceName();<a name="line.8257"></a> -<span class="sourceLineNo">8258</span> com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);<a name="line.8258"></a> -<span class="sourceLineNo">8259</span> if (service == null) {<a name="line.8259"></a> -<span class="sourceLineNo">8260</span> throw new UnknownProtocolException(null, "No registered coprocessor service found for " +<a name="line.8260"></a> -<span class="sourceLineNo">8261</span> serviceName + " in region " + Bytes.toStringBinary(getRegionInfo().getRegionName()));<a name="line.8261"></a> -<span class="sourceLineNo">8262</span> }<a name="line.8262"></a> -<span class="sourceLineNo">8263</span> com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();<a name="line.8263"></a> +<span class="sourceLineNo">8063</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.8063"></a> +<span class="sourceLineNo">8064</span> }<a name="line.8064"></a> +<span class="sourceLineNo">8065</span><a name="line.8065"></a> +<span class="sourceLineNo">8066</span> // Give coprocessors a chance to update the new cell<a name="line.8066"></a> +<span class="sourceLineNo">8067</span> if (coprocessorHost != null) {<a name="line.8067"></a> +<span class="sourceLineNo">8068</span> newCell =<a name="line.8068"></a> +<span class="sourceLineNo">8069</span> coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);<a name="line.8069"></a> +<span class="sourceLineNo">8070</span> }<a name="line.8070"></a> +<span class="sourceLineNo">8071</span> toApply.add(newCell);<a name="line.8071"></a> +<span class="sourceLineNo">8072</span> // Add to results to get returned to the Client. If null, cilent does not want results.<a name="line.8072"></a> +<span class="sourceLineNo">8073</span> if (results != null) {<a name="line.8073"></a> +<span class="sourceLineNo">8074</span> results.add(newCell);<a name="line.8074"></a> +<span class="sourceLineNo">8075</span> }<a name="line.8075"></a> +<span class="sourceLineNo">8076</span> }<a name="line.8076"></a> +<span class="sourceLineNo">8077</span> return toApply;<a name="line.8077"></a> +<span class="sourceLineNo">8078</span> }<a name="line.8078"></a> +<span class="sourceLineNo">8079</span><a name="line.8079"></a> +<span class="sourceLineNo">8080</span> private static Cell reckonDelta(final Cell delta, final Cell currentCell,<a name="line.8080"></a> +<span class="sourceLineNo">8081</span> final byte[] columnFamily, final long now,<a name="line.8081"></a> +<span class="sourceLineNo">8082</span> Mutation mutation, Function<Cell, byte[]> supplier) throws IOException {<a name="line.8082"></a> +<span class="sourceLineNo">8083</span> // Forward any tags found on the delta.<a name="line.8083"></a> +<span class="sourceLineNo">8084</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.8084"></a> +<span class="sourceLineNo">8085</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.8085"></a> +<span class="sourceLineNo">8086</span> if (currentCell != null) {<a name="line.8086"></a> +<span class="sourceLineNo">8087</span> tags = TagUtil.carryForwardTags(tags, currentCell);<a name="line.8087"></a> +<span class="sourceLineNo">8088</span> byte[] newValue = supplier.apply(currentCell);<a name="line.8088"></a> +<span class="sourceLineNo">8089</span> return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)<a name="line.8089"></a> +<span class="sourceLineNo">8090</span> .setRow(mutation.getRow(), 0, mutation.getRow().length)<a name="line.8090"></a> +<span class="sourceLineNo">8091</span> .setFamily(columnFamily, 0, columnFamily.length)<a name="line.8091"></a> +<span class="sourceLineNo">8092</span> // copy the qualifier if the cell is located in shared memory.<a name="line.8092"></a> +<span class="sourceLineNo">8093</span> .setQualifier(CellUtil.cloneQualifier(delta))<a name="line.8093"></a> +<span class="sourceLineNo">8094</span> .setTimestamp(Math.max(currentCell.getTimestamp() + 1, now))<a name="line.8094"></a> +<span class="sourceLineNo">8095</span> .setType(KeyValue.Type.Put.getCode())<a name="line.8095"></a> +<span class="sourceLineNo">8096</span> .setValue(newValue, 0, newValue.length)<a name="line.8096"></a> +<span class="sourceLineNo">8097</span> .setTags(TagUtil.fromList(tags))<a name="line.8097"></a> +<span class="sourceLineNo">8098</span> .build();<a name="line.8098"></a> +<span class="sourceLineNo">8099</span> } else {<a name="line.8099"></a> +<span class="sourceLineNo">8100</span> PrivateCellUtil.updateLatestStamp(delta, now);<a name="line.8100"></a> +<span class="sourceLineNo">8101</span> return CollectionUtils.isEmpty(tags) ? delta : PrivateCellUtil.createCell(delta, tags);<a name="line.8101"></a> +<span class="sourceLineNo">8102</span> }<a name="line.8102"></a> +<span class="sourceLineNo">8103</span> }<a name="line.8103"></a> +<span class="sourceLineNo">8104</span><a name="line.8104"></a> +<span class="sourceLineNo">8105</span> /**<a name="line.8105"></a> +<span class="sourceLineNo">8106</span> * @return Get the long out of the passed in Cell<a name="line.8106"></a> +<span class="sourceLineNo">8107</span> */<a name="line.8107"></a> +<span class="sourceLineNo">8108</span> private static long getLongValue(final Cell cell) throws DoNotRetryIOException {<a name="line.8108"></a> +<span class="sourceLineNo">8109</span> int len = cell.getValueLength();<a name="line.8109"></a> +<span class="sourceLineNo">8110</span> if (len != Bytes.SIZEOF_LONG) {<a name="line.8110"></a> +<span class="sourceLineNo">8111</span> // throw DoNotRetryIOException instead of IllegalArgumentException<a name="line.8111"></a> +<span class="sourceLineNo">8112</span> throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");<a name="line.8112"></a> +<span class="sourceLineNo">8113</span> }<a name="line.8113"></a> +<span class="sourceLineNo">8114</span> return PrivateCellUtil.getValueAsLong(cell);<a name="line.8114"></a> +<span class="sourceLineNo">8115</span> }<a name="line.8115"></a> +<span class="sourceLineNo">8116</span><a name="line.8116"></a> +<span class="sourceLineNo">8117</span> /**<a name="line.8117"></a> +<span class="sourceLineNo">8118</span> * Do a specific Get on passed <code>columnFamily</code> and column qualifiers.<a name="line.8118"></a> +<span class="sourceLineNo">8119</span> * @param mutation Mutation we are doing this Get for.<a name="line.8119"></a> +<span class="sourceLineNo">8120</span> * @param store Which column family on row (TODO: Go all Gets in one go)<a name="line.8120"></a> +<span class="sourceLineNo">8121</span> * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.<a name="line.8121"></a> +<span class="sourceLineNo">8122</span> * @return Return list of Cells found.<a name="line.8122"></a> +<span class="sourceLineNo">8123</span> */<a name="line.8123"></a> +<span class="sourceLineNo">8124</span> private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,<a name="line.8124"></a> +<span class="sourceLineNo">8125</span> IsolationLevel isolation, TimeRange tr) throws IOException {<a name="line.8125"></a> +<span class="sourceLineNo">8126</span> // Sort the cells so that they match the order that they appear in the Get results. Otherwise,<a name="line.8126"></a> +<span class="sourceLineNo">8127</span> // we won't be able to find the existing values if the cells are not specified in order by the<a name="line.8127"></a> +<span class="sourceLineNo">8128</span> // client since cells are in an array list.<a name="line.8128"></a> +<span class="sourceLineNo">8129</span> // TODO: I don't get why we are sorting. St.Ack 20150107<a name="line.8129"></a> +<span class="sourceLineNo">8130</span> sort(coordinates, store.getComparator());<a name="line.8130"></a> +<span class="sourceLineNo">8131</span> Get get = new Get(mutation.getRow());<a name="line.8131"></a> +<span class="sourceLineNo">8132</span> if (isolation != null) {<a name="line.8132"></a> +<span class="sourceLineNo">8133</span> get.setIsolationLevel(isolation);<a name="line.8133"></a> +<span class="sourceLineNo">8134</span> }<a name="line.8134"></a> +<span class="sourceLineNo">8135</span> for (Cell cell: coordinates) {<a name="line.8135"></a> +<span class="sourceLineNo">8136</span> get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell));<a name="line.8136"></a> +<span class="sourceLineNo">8137</span> }<a name="line.8137"></a> +<span class="sourceLineNo">8138</span> // Increments carry time range. If an Increment instance, put it on the Get.<a name="line.8138"></a> +<span class="sourceLineNo">8139</span> if (tr != null) {<a name="line.8139"></a> +<span class="sourceLineNo">8140</span> get.setTimeRange(tr.getMin(), tr.getMax());<a name="line.8140"></a> +<span class="sourceLineNo">8141</span> }<a name="line.8141"></a> +<span class="sourceLineNo">8142</span> return get(get, false);<a name="line.8142"></a> +<span class="sourceLineNo">8143</span> }<a name="line.8143"></a> +<span class="sourceLineNo">8144</span><a name="line.8144"></a> +<span class="sourceLineNo">8145</span> /**<a name="line.8145"></a> +<span class="sourceLineNo">8146</span> * @return Sorted list of <code>cells</code> using <code>comparator</code><a name="line.8146"></a> +<span class="sourceLineNo">8147</span> */<a name="line.8147"></a> +<span class="sourceLineNo">8148</span> private static List<Cell> sort(List<Cell> cells, final CellComparator comparator) {<a name="line.8148"></a> +<span class="sourceLineNo">8149</span> cells.sort(comparator);<a name="line.8149"></a> +<span class="sourceLineNo">8150</span> return cells;<a name="line.8150"></a> +<span class="sourceLineNo">8151</span> }<a name="line.8151"></a> +<span class="sourceLineNo">8152</span><a name="line.8152"></a> +<span class="sourceLineNo">8153</span> public static final long FIXED_OVERHEAD = ClassSize.align(<a name="line.8153"></a> +<span class="sourceLineNo">8154</span> ClassSize.OBJECT +<a name="line.8154"></a> +<span class="sourceLineNo">8155</span> ClassSize.ARRAY +<a name="line.8155"></a> +<span class="sourceLineNo">8156</span> 55 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +<a name="line.8156"></a> +<span class="sourceLineNo">8157</span> (15 * Bytes.SIZEOF_LONG) +<a name="line.8157"></a> +<span class="sourceLineNo">8158</span> 3 * Bytes.SIZEOF_BOOLEAN);<a name="line.8158"></a> +<span class="sourceLineNo">8159</span><a name="line.8159"></a> +<span class="sourceLineNo">8160</span> // woefully out of date - currently missing:<a name="line.8160"></a> +<span class="sourceLineNo">8161</span> // 1 x HashMap - coprocessorServiceHandlers<a name="line.8161"></a> +<span class="sourceLineNo">8162</span> // 6 x LongAdder - numMutationsWithoutWAL, dataInMemoryWithoutWAL,<a name="line.8162"></a> +<span class="sourceLineNo">8163</span> // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,<a name="line.8163"></a> +<span class="sourceLineNo">8164</span> // writeRequestsCount, cpRequestsCount<a name="line.8164"></a> +<span class="sourceLineNo">8165</span> // 1 x HRegion$WriteState - writestate<a name="line.8165"></a> +<span class="sourceLineNo">8166</span> // 1 x RegionCoprocessorHost - coprocessorHost<a name="line.8166"></a> +<span class="sourceLineNo">8167</span> // 1 x RegionSplitPolicy - splitPolicy<a name="line.8167"></a> +<span class="sourceLineNo">8168</span> // 1 x MetricsRegion - metricsRegion<a name="line.8168"></a> +<span class="sourceLineNo">8169</span> // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper<a name="line.8169"></a> +<span class="sourceLineNo">8170</span> public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +<a name="line.8170"></a> +<span class="sourceLineNo">8171</span> ClassSize.OBJECT + // closeLock<a name="line.8171"></a> +<span class="sourceLineNo">8172</span> (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing<a name="line.8172"></a> +<span class="sourceLineNo">8173</span> (3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL,<a name="line.8173"></a> +<span class="sourceLineNo">8174</span> // compactionsFailed<a name="line.8174"></a> +<span class="sourceLineNo">8175</span> (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints<a name="line.8175"></a> +<span class="sourceLineNo">8176</span> WriteState.HEAP_SIZE + // writestate<a name="line.8176"></a> +<span class="sourceLineNo">8177</span> ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores<a name="line.8177"></a> +<span class="sourceLineNo">8178</span> (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock<a name="line.8178"></a> +<span class="sourceLineNo">8179</span> MultiVersionConcurrencyControl.FIXED_SIZE // mvcc<a name="line.8179"></a> +<span class="sourceLineNo">8180</span> + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes<a name="line.8180"></a> +<span class="sourceLineNo">8181</span> + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress<a name="line.8181"></a> +<span class="sourceLineNo">8182</span> + ClassSize.STORE_SERVICES // store services<a name="line.8182"></a> +<span class="sourceLineNo">8183</span> + StoreHotnessProtector.FIXED_SIZE<a name="line.8183"></a> +<span class="sourceLineNo">8184</span> ;<a name="line.8184"></a> +<span class="sourceLineNo">8185</span><a name="line.8185"></a> +<span class="sourceLineNo">8186</span> @Override<a name="line.8186"></a> +<span class="sourceLineNo">8187</span> public long heapSize() {<a name="line.8187"></a> +<span class="sourceLineNo">8188</span> // this does not take into account row locks, recent flushes, mvcc entries, and more<a name="line.8188"></a> +<span class="sourceLineNo">8189</span> return DEEP_OVERHEAD + stores.values().stream().mapToLong(HStore::heapSize).sum();<a name="line.8189"></a> +<span class="sourceLineNo">8190</span> }<a name="line.8190"></a> +<span class="sourceLineNo">8191</span><a name="line.8191"></a> +<span class="sourceLineNo">8192</span> /**<a name="line.8192"></a> +<span class="sourceLineNo">8193</span> * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to<a name="line.8193"></a> +<span class="sourceLineNo">8194</span> * be available for handling Region#execService(com.google.protobuf.RpcController,<a name="line.8194"></a> +<span class="sourceLineNo">8195</span> * org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall) calls.<a name="line.8195"></a> +<span class="sourceLineNo">8196</span> *<a name="line.8196"></a> +<span class="sourceLineNo">8197</span> * <p><a name="line.8197"></a> +<span class="sourceLineNo">8198</span> * Only a single instance may be registered per region for a given {@link Service} subclass (the<a name="line.8198"></a> +<span class="sourceLineNo">8199</span> * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.<a name="line.8199"></a> +<span class="sourceLineNo">8200</span> * After the first registration, subsequent calls with the same service name will fail with<a name="line.8200"></a> +<span class="sourceLineNo">8201</span> * a return value of {@code false}.<a name="line.8201"></a> +<span class="sourceLineNo">8202</span> * </p><a name="line.8202"></a> +<span class="sourceLineNo">8203</span> * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint<a name="line.8203"></a> +<span class="sourceLineNo">8204</span> * @return {@code true} if the registration was successful, {@code false}<a name="line.8204"></a> +<span class="sourceLineNo">8205</span> * otherwise<a name="line.8205"></a> +<span class="sourceLineNo">8206</span> */<a name="line.8206"></a> +<span class="sourceLineNo">8207</span> public boolean registerService(com.google.protobuf.Service instance) {<a name="line.8207"></a> +<span class="sourceLineNo">8208</span> /*<a name="line.8208"></a> +<span class="sourceLineNo">8209</span> * No stacking of instances is allowed for a single service name<a name="line.8209"></a> +<span class="sourceLineNo">8210</span> */<a name="line.8210"></a> +<span class="sourceLineNo">8211</span> com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();<a name="line.8211"></a> +<span class="sourceLineNo">8212</span> String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);<a name="line.8212"></a> +<span class="sourceLineNo">8213</span> if (coprocessorServiceHandlers.containsKey(serviceName)) {<a name="line.8213"></a> +<span class="sourceLineNo">8214</span> LOG.error("Coprocessor service " + serviceName +<a name="line.8214"></a> +<span class="sourceLineNo">8215</span> " already registered, rejecting request from " + instance);<a name="line.8215"></a> +<span class="sourceLineNo">8216</span> return false;<a name="line.8216"></a> +<span class="sourceLineNo">8217</span> }<a name="line.8217"></a> +<span class="sourceLineNo">8218</span><a name="line.8218"></a> +<span class="sourceLineNo">8219</span> coprocessorServiceHandlers.put(serviceName, instance);<a name="line.8219"></a> +<span class="sourceLineNo">8220</span> if (LOG.isDebugEnabled()) {<a name="line.8220"></a> +<span class="sourceLineNo">8221</span> LOG.debug("Registered coprocessor service: region=" +<a name="line.8221"></a> +<span class="sourceLineNo">8222</span> Bytes.toStringBinary(getRegionInfo().getRegionName()) +<a name="line.8222"></a> +<span class="sourceLineNo">8223</span> " service=" + serviceName);<a name="line.8223"></a> +<span class="sourceLineNo">8224</span> }<a name="line.8224"></a> +<span class="sourceLineNo">8225</span> return true;<a name="line.8225"></a> +<span class="sourceLineNo">8226</span> }<a name="line.8226"></a> +<span class="sourceLineNo">8227</span><a name="line.8227"></a> +<span class="sourceLineNo">8228</span> /**<a name="line.8228"></a> +<span class="sourceLineNo">8229</span> * Executes a single protocol buffer coprocessor endpoint {@link Service} method using<a name="line.8229"></a> +<span class="sourceLineNo">8230</span> * the registered protocol handlers. {@link Service} implementations must be registered via the<a name="line.8230"></a> +<span class="sourceLineNo">8231</span> * {@link #registerService(com.google.protobuf.Service)}<a name="line.8231"></a> +<span class="sourceLineNo">8232</span> * method before they are available.<a name="line.8232"></a> +<span class="sourceLineNo">8233</span> *<a name="line.8233"></a> +<span class="sourceLineNo">8234</span> * @param controller an {@code RpcContoller} implementation to pass to the invoked service<a name="line.8234"></a> +<span class="sourceLineNo">8235</span> * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,<a name="line.8235"></a> +<span class="sourceLineNo">8236</span> * and parameters for the method invocation<a name="line.8236"></a> +<span class="sourceLineNo">8237</span> * @return a protocol buffer {@code Message} instance containing the method's result<a name="line.8237"></a> +<span class="sourceLineNo">8238</span> * @throws IOException if no registered service handler is found or an error<a name="line.8238"></a> +<span class="sourceLineNo">8239</span> * occurs during the invocation<a name="line.8239"></a> +<span class="sourceLineNo">8240</span> * @see #registerService(com.google.protobuf.Service)<a name="line.8240"></a> +<span class="sourceLineNo">8241</span> */<a name="line.8241"></a> +<span class="sourceLineNo">8242</span> public com.google.protobuf.Message execService(com.google.protobuf.RpcController controller,<a name="line.8242"></a> +<span class="sourceLineNo">8243</span> CoprocessorServiceCall call) throws IOException {<a name="line.8243"></a> +<span class="sourceLineNo">8244</span> String serviceName = call.getServiceName();<a name="line.8244"></a> +<span class="sourceLineNo">8245</span> com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);<a name="line.8245"></a> +<span class="sourceLineNo">8246</span> if (service == null) {<a name="line.8246"></a> +<span class="sourceLineNo">8247</span> throw new UnknownProtocolException(null, "No registered coprocessor service found for " +<a name="line.8247"></a> +<span class="sourceLineNo">8248</span> serviceName + " in region " + Bytes.toStringBinary(getRegionInfo().getRegionName()));<a name="line.8248"></a> +<span class="sourceLineNo">8249</span> }<a name="line.8249"></a> +<span class="sourceLineNo">8250</span> com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();<a name="line.8250"></a> +<span class="sourceLineNo">8251</span><a name="line.8251"></a> +<span class="sourceLineNo">8252</span> cpRequestsCount.increment();<a name="line.8252"></a> +<span class="sourceLineNo">8253</span> String methodName = call.getMethodName();<a name="line.8253"></a> +<span class="sourceLineNo">8254</span> com.google.protobuf.Descriptors.MethodDescriptor methodDesc =<a name="line.8254"></a> +<span class="sourceLineNo">8255</span> CoprocessorRpcUtils.getMethodDescriptor(methodName, serviceDesc);<a name="line.8255"></a> +<span class="sourceLineNo">8256</span><a name="line.8256"></a> +<span class="sourceLineNo">8257</span> com.google.protobuf.Message.Builder builder =<a name="line.8257"></a> +<span class="sourceLineNo">8258</span> service.getRequestPrototype(methodDesc).newBuilderForType();<a name="line.8258"></a> +<span class="sourceLineNo">8259</span><a name="line.8259"></a> +<span class="sourceLineNo">8260</span> org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,<a name="line.8260"></a> +<span class="sourceLineNo">8261</span> call.getRequest().toByteArray());<a name="line.8261"></a> +<span class="sourceLineNo">8262</span> com.google.protobuf.Message request =<a name="line.8262"></a> +<span class="sourceLineNo">8263</span> CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());<a name="line.8263"></a> <span class="sourceLineNo">8264</span><a name="line.8264"></a> -<span class="sourceLineNo">8265</span> cpRequestsCount.increment();<a name="line.8265"></a> -<span class="sourceLineNo">8266</span> String methodName = call.getMethodName();<a name="line.8266"></a> -<span class="sourceLineNo">8267</span> com.google.protobuf.Descriptors.MethodDescriptor methodDesc =<a name="line.8267"></a> -<span class="sourceLineNo">8268</span> CoprocessorRpcUtils.getMethodDescriptor(methodName, serviceDesc);<a name="line.8268"></a> -<span class="sourceLineNo">8269</span><a name="line.8269"></a> -<span class="sourceLineNo">8270</span> com.google.protobuf.Message.Builder builder =<a name="line.8270"></a> -<span class="sourceLineNo">8271</span> service.getRequestPrototype(methodDesc).newBuilderForType();<a name="line.8271"></a> -<span class="sourceLineNo">8272</span><a name="line.8272"></a> -<span class="sourceLineNo">8273</span> org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,<a name="line.8273"></a> -<span class="sourceLineNo">8274</span> call.getRequest().toByteArray());<a name="line.8274"></a> -<span class="sourceLineNo">8275</span> com.google.protobuf.Message request =<a name="line.8275"></a> -<span class="sourceLineNo">8276</span> CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());<a name="line.8276"></a> -<span class="sourceLineNo">8277</span><a name="line.8277"></a> -<span class="sourceLineNo">8278</span> if (coprocessorHost != null) {<a name="line.8278"></a> -<span class="sourceLineNo">8279</span> request = coprocessorHost.preEndpointInvocation(service, methodName, request);<a name="line.8279"></a> -<span class="sourceLineNo">8280</span> }<a name="line.8280"></a> -<span class="sourceLineNo">8281</span><a name="line.8281"></a> -<span class="sourceLineNo">8282</span> final com.google.protobuf.Message.Builder responseBuilder =<a name="line.8282"></a> -<span class="sourceLineNo">8283</span> service.getResponsePrototype(methodDesc).newBuilderForType();<a name="line.8283"></a> -<span class="sourceLineNo">8284</span> service.callMethod(methodDesc, controller, request,<a name="line.8284"></a> -<span class="sourceLineNo">8285</span> new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() {<a name="line.8285"></a> -<span class="sourceLineNo">8286</span> @Override<a name="line.8286"></a> -<span class="sourceLineNo">8287</span> public void run(com.google.protobuf.Message message) {<a name="line.8287"></a> -<span class="sourceLineNo">8288</span> if (message != null) {<a name="line.8288"></a> -<span class="sourceLineNo">8289</span> responseBuilder.mergeFrom(message);<a name="line.8289"></a> -<span class="sourceLineNo">8290</span> }<a name="line.8290"></a> -<span class="sourceLineNo">8291</span> }<a name="line.8291"></a> -<span class="sourceLineNo">8292</span> });<a name="line.8292"></a> -<span class="sourceLineNo">8293</span><a name="line.8293"></a> -<span class="sourceLineNo">8294</span> if (coprocessorHost != null) {<a name="line.8294"></a> -<span class="sourceLineNo">8295</span> coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);<a name="line.8295"></a> -<span class="sourceLineNo">8296</span> }<a name="line.8296"></a> -<span class="sourceLineNo">8297</span> IOException exception =<a name="line.8297"></a> -<span class="sourceLineNo">8298</span> org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.getControllerException(controller);<a name="line.8298"></a> -<span class="sourceLineNo">8299</span> if (exception != null) {<a name="line.8299"></a> -<span class="sourceLineNo">8300</span> throw exception;<a name="line.8300"></a> -<span class="sourceLineNo">8301</span> }<a name="line.8301"></a> -<span class="sourceLineNo">8302</span><a name="line.8302"></a> -<span class="sourceLineNo">8303</span> return responseBuilder.build();<a name="line.8303"></a> -<span class="sourceLineNo">8304</span> }<a name="line.8304"></a> -<span class="sourceLineNo">8305</span><a name="line.8305"></a> -<span class="sourceLineNo">8306</span> boolean shouldForceSplit() {<a name="line.8306"></a> -<span class="sourceLineNo">8307</span> return this.splitRequest;<a name="line.8307"></a> +<span class="sourceLineNo">8265</span> if (coprocessorHost != null) {<a name="line.8265"></a> +<span class="sourceLineNo">8266</span> request = coprocessorHost.preEndpointInvocation(service, methodName, request);<a name="line.8266"></a> +<span class="sourceLineNo">8267</span> }<a name="line.8267"></a> +<span class="sourceLineNo">8268</span><a name="line.8268"></a> +<span class="sourceLineNo">8269</span> final com.google.protobuf.Message.Builder responseBuilder =<a name="line.8269"></a> +<span class="sourceLineNo">8270</span> service.getResponsePrototype(methodDesc).newBuilderForType();<a name="line.8270"></a> +<span class="sourceLineNo">8271</span> service.callMethod(methodDesc, controller, request,<a name="line.8271"></a> +<span class="sourceLineNo">8272</span> new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() {<a name="line.8272"></a> +<span class="sourceLineNo">8273</span> @Override<a name="line.8273"></a> +<span class="sourceLineNo">8274</span> public void run(com.google.protobuf.Message message) {<a name="line.8274"></a> +<span class="sourceLineNo">8275</span> if (message != null) {<a name="line.8275"></a> +<span class="sourceLineNo">8276</span> responseBuilder.mergeFrom(message);<a name="line.8276"></a> +<span class="sourceLineNo">8277</span> }<a name="line.8277"></a> +<span class="sourceLineNo">8278</span> }<a name="line.8278"></a> +<span class="sourceLineNo">8279</span> });<a name="line.8279"></a> +<span class="sourceLineNo">8280</span><a name="line.8280"></a> +<span class="sourceLineNo">8281</span> if (coprocessorHost != null) {<a name="line.8281"></a> +<span class="sourceLineNo">8282</span> coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);<a name="line.8282"></a> +<span class="sourceLineNo">8283</span> }<a name="line.8283"></a> +<span class="sourceLineNo">8284</span> IOException exception =<a name="line.8284"></a> +<span class="sourceLineNo">8285</span> org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.getControllerException(controller);<a name="line.8285"></a> +<span class="sourceLineNo">8286</span> if (exception != null) {<a name="line.8286"></a> +<span class="sourceLineNo">8287</span> throw exception;<a name="line.8287"></a> +<span class="sourceLineNo">8288</span> }<a name="line.8288"></a> +<span class="sourceLineNo">8289</span><a name="line.8289"></a> +<span class="sourceLineNo">8290</span> return responseBuilder.build();<a name="line.8290"></a> +<span class="sourceLineNo">8291</span> }<a name="line.8291"></a> +<span class="sourceLineNo">8292</span><a name="line.8292"></a> +<span class="sourceLineNo">8293</span> boolean shouldForceSplit() {<a name="line.8293"></a> +<span class="sourceLineNo">8294</span> return this.splitRequest;<a name="line.8294"></a> +<span class="sourceLineNo">8295</span> }<a name="line.8295"></a> +<span class="sourceLineNo">8296</span><a name="line.8296"></a> +<span class="sourceLineNo">8297</span> byte[] getExplicitSplitPoint() {<a name="line.8297"></a> +<span class="sourceLineNo">8298</span> return this.explicitSplitPoint;<a name="line.8298"></a> +<span class="sourceLineNo">8299</span> }<a name="line.8299"></a> +<span class="sourceLineNo">8300</span><a name="line.8300"></a> +<span class="sourceLineNo">8301</span> void forceSplit(byte[] sp) {<a name="line.8301"></a> +<span class="sourceLineNo">8302</span> // This HRegion will go away after the forced split is successful<a name="line.8302"></a> +<span class="sourceLineNo">8303</span> // But if a forced split fails, we need to clear forced split.<a name="line.8303"></a> +<span class="sourceLineNo">8304</span> this.splitRequest = true;<a name="line.8304"></a> +<span class="sourceLineNo">8305</span> if (sp != null) {<a name="line.8305"></a> +<span class="sourceLineNo">8306</span> this.explicitSplitPoint = sp;<a name="line.8306"></a> +<span class="sourceLineNo">8307</span> }<a name="line.8307"></a> <span class="sourceLineNo">8308</span> }<a name="line.8308"></a> <span class="sourceLineNo">8309</span><a name="line.8309"></a> -<span class="sourceLineNo">8310</span> byte[] getExplicitSplitPoint() {<a name="line.8310"></a> -<span class="sourceLineNo">8311</span> return this.explicitSplitPoint;<a name="line.8311"></a> -<span class="sourceLineNo">8312</span> }<a name="line.8312"></a> -<span class="sourceLineNo">8313</span><a name="line.8313"></a> -<span class="sourceLineNo">8314</span> void forceSplit(byte[] sp) {<a name="line.8314"></a> -<span class="sourceLineNo">8315</span> // This HRegion will go away after the forced split is successful<a name="line.8315"></a> -<span class="sourceLineNo">8316</span> // But if a forced split fails, we need to clear forced split.<a name="line.8316"></a> -<span class="sourceLineNo">8317</span> this.splitRequest = true;<a name="line.8317"></a> -<span class="sourceLineNo">8318</span> if (sp != null) {<a name="line.8318"></a> -<span class="sourceLineNo">8319</span> this.explicitSplitPoint = sp;<a name="line.8319"></a> -<span class="sourceLineNo">8320</span> }<a name="line.8320"></a> -<span class="sourceLineNo">8321</span> }<a name="line.8321"></a> -<span class="sourceLineNo">8322</span><a name="line.8322"></a> -<span class="sourceLineNo">8323</span> void clearSplit() {<a name="line.8323"></a> -<span class="sourceLineNo">8324</span> this.splitRequest = false;<a name="line.8324"></a> -<span class="sourceLineNo">8325</span> this.explicitSplitPoint = null;<a name="line.8325"></a> -<span class="sourceLineNo">8326</span> }<a name="line.8326"></a> -<span class="sourceLineNo">8327</span><a name="line.8327"></a> -<span class="sourceLineNo">8328</span> /**<a name="line.8328"></a> -<span class="sourceLineNo">8329</span> * Return the splitpoint. null indicates the region isn't splittable<a name="line.8329"></a> -<span class="sourceLineNo">8330</span> * If the splitpoint isn't explicitly specified, it will go over the stores<a name="line.8330"></a> -<span class="sourceLineNo">8331</span> * to find the best splitpoint. Currently the criteria of best splitpoint<a name="line.8331"></a> -<span class="sourceLineNo">8332</span> * is based on the size of the store.<a name="line.8332"></a> -<span class="sourceLineNo">8333</span> */<a name="line.8333"></a> -<span class="sourceLineNo">8334</span> public byte[] checkSplit() {<a name="line.8334"></a> -<span class="sourceLineNo">8335</span> // Can't split META<a name="line.8335"></a> -<span class="sourceLineNo">8336</span> if (this.getRegionInfo().isMetaRegion()) {<a name="line.8336"></a> -<span class="sourceLineNo">8337</span> if (shouldForceSplit()) {<a name="line.8337"></a> -<span class="sourceLineNo">8338</span> LOG.warn("Cannot split meta region in HBase 0.20 and above");<a name="line.8338"></a> -<span class="sourceLineNo">8339</span> }<a name="line.8339"></a> -<span class="sourceLineNo">8340</span> return null;<a name="line.8340"></a> -<span class="sourceLineNo">8341</span> }<a name="line.8341"></a> -<span class="sourceLineNo">8342</span><a name="line.8342"></a> -<span class="sourceLineNo">8343</span> // Can't split a region that is closing.<a name="line.8343"></a> -<span class="sourceLineNo">8344</span> if (this.isClosing()) {<a name="line.8344"></a> -<span class="sourceLineNo">8345</span> return null;<a name="line.8345"></a> -<span class="sourceLineNo">8346</span> }<a name="line.8346"></a> -<span class="sourceLineNo">8347</span><a name="line.8347"></a> -<span class="sourceLineNo">8348</span> if (!splitPolicy.shouldSplit()) {<a name="line.8348"></a> -<span class="sourceLineNo">8349</span> return null;<a name="line.8349"></a> -<span class="sourceLineNo">8350</span> }<a name="line.8350"></a> +<span class="sourceLineNo">8310</span> void clearSplit() {<a name="line.8310"></a> +<span class="sourceLineNo">8311</span> this.splitRequest = false;<a name="line.8311"></a> +<span class="sourceLineNo">8312</span> this.explicitSplitPoint = null;<a name="line.8312"></a> +<span class="sourceLineNo">8313</span> }<a name="line.8313"></a> +<span class="sourceLineNo">8314</span><a name="line.8314"></a> +<span class="sourceLineNo">8315</span> /**<a name="line.8315"></a> +<span class="sourceLineNo">8316</span> * Return the splitpoint. null indicates the region isn't splittable<a name="line.8316"></a> +<span class="sourceLineNo">8317</span> * If the splitpoint isn't explicitly specified, it will go over the stores<a name="line.8317"></a> +<span class="sourceLineNo">8318</span> * to find the best splitpoint. Currently the criteria of best splitpoint<a name="line.8318"></a> +<span class="sourceLineNo">8319</span> * is based on the size of the store.<a name="line.8319"></a> +<span class="sourceLineNo">8320</span> */<a name="line.8320"></a> +<span class="sourceLineNo">8321</span> public byte[] checkSplit() {<a name="line.8321"></a> +<span class="sourceLineNo">8322</span> // Can't split META<a name="line.8322"></a> +<span class="sourceLineNo">8323</span> if (this.getRegionInfo().isMetaRegion()) {<a name="line.8323"></a> +<span class="sourceLineNo">8324</span> if (shouldForceSplit()) {<a name="line.8324"></a> +<span class="sourceLineNo">8325</span> LOG.warn("Cannot split meta region in HBase 0.20 and above");<a name="line.8325"></a> +<span class="sourceLineNo">8326</span> }<a name="line.8326"></a> +<span class="sourceLineNo">8327</span> return null;<a name="line.8327"></a> +<span class="sourceLineNo">8328</span> }<a name="line.8328"></a> +<span class="sourceLineNo">8329</span><a name="line.8329"></a> +<span class="sourceLineNo">8330</span> // Can't split a region that is closing.<a name="line.8330"></a> +<span class="sourceLineNo">8331</span> if (this.isClosing()) {<a name="line.8331"></a> +<span class="sourceLineNo">8332</span> return null;<a name="line.8332"></a> +<span class="sourceLineNo">8333</span> }<a name="line.8333"></a> +<span class="sourceLineNo">8334</span><a name="line.8334"></a> +<span class="sourceLineNo">8335</span> if (!splitPolicy.shouldSplit()) {<a name="line.8335"></a> +<span class="sourceLineNo">8336</span> return null;<a name="line.8336"></a> +<span class="sourceLineNo">8337</span> }<a name="line.8337"></a> +<span class="sourceLineNo">8338</span><a name="line.8338"></a> +<span class="sourceLineNo">8339</span> byte[] ret = splitPolicy.getSplitPoint();<a name="line.8339"></a> +<span class="sourceLineNo">8340</span><a name="line.8340"></a> +<span class="sourceLineNo">8341</span> if (ret != null) {<a name="line.8341"></a> +<span class="sourceLineNo">8342</span> try {<a name="line.8342"></a> +<span class="sourceLineNo">8343</span> checkRow(ret, "calculated split");<a name="line.8343"></a> +<span class="sourceLineNo">8344</span> } catch (IOException e) {<a name="line.8344"></a> +<span class="sourceLineNo">8345</span> LOG.error("Ignoring invalid split", e);<a name="line.8345"></a> +<span class="sourceLineNo">8346</span> return null;<a name="line.8346"></a> +<span class="sourceLineNo">8347</span> }<a name="line.8347"></a> +<span class="sourceLineNo">8348</span> }<a name="line.8348"></a> +<span class="sourceLineNo">8349</span> return ret;<a name="line.8349"></a> +<span class="sourceLineNo">8350</span> }<a name="line.8350"></a> <span class="sourceLineNo">8351</span><a name="line.8351"></a> -<span class="sourceLineNo">8352</span> byte[] ret = splitPolicy.getSplitPoint();<a name="line.8352"></a> -<span class="sourceLineNo">8353</span><a name="line.8353"></a> -<span class="sourceLineNo">8354</span> if (ret != null) {<a name="line.8354"></a> -<span class="sourceLineNo">8355</span> try {<a name="line.8355"></a> -<span class="sourceLineNo">8356</span> checkRow(ret, "calculated split");<a name="line.8356"></a> -<span class="sourceLineNo">8357</span> } catch (IOException e) {<a name="line.8357"></a> -<span class="sourceLineNo">8358</span> LOG.error("Ignoring invalid split", e);<a name="line.8358"></a> -<span class="sourceLineNo">8359</span> return null;<a name="line.8359"></a> -<span class="sourceLineNo">8360</span> }<a name="line.8360"></a> -<span class="sourceLineNo">8361</span> }<a name="line.8361"></a> -<span class="sourceLineNo">8362</span> return ret;<a name="line.8362"></a> +<span class="sourceLineNo">8352</span> /**<a name="line.8352"></a> +<span class="sourceLineNo">8353</span> * @return The priority that this region should have in the compaction queue<a name="line.8353"></a> +<span class="sourceLineNo">8354</span> */<a name="line.8354"></a> +<span class="sourceLineNo">8355</span> public int getCompactPriority() {<a name="line.8355"></a> +<span class="sourceLineNo">8356</span> return stores.values().stream().mapToInt(HStore::getCompactPriority).min()<a name="line.8356"></a> +<span class="sourceLineNo">8357</span> .orElse(Store.NO_PRIORITY);<a name="line.8357"></a> +<span class="sourceLineNo">8358</span> }<a name="line.8358"></a> +<span class="sourceLineNo">8359</span><a name="line.8359"></a> +<span class="sourceLineNo">8360</span> /** @return the coprocessor host */<a name="line.8360"></a> +<span class="sourceLineNo">8361</span> public RegionCoprocessorHost getCoprocessorHost() {<a name="line.8361"></a> +<span class="sourceLineNo">8362</span> return coprocessorHost;<a name="line.8362"></a> <span class="sourceLineNo">8363</span> }<a name="line.8363"></a> <span class="sourceLineNo">8364</span><a name="line.8364"></a> -<span class="sourceLineNo">8365</span> /**<a name="line.8365"></a> -<span class="sourceLineNo">8366</span> * @return The priority that this region should have in the compaction queue<a name="line.8366"></a> -<span class="sourceLineNo">8367</span> */<a name="line.8367"></a> -<span class="sourceLineNo">8368</span> public int getCompactPriority() {<a name="line.8368"></a> -<span class="sourceLineNo">8369</span> return stores.values().stream().mapToInt(HStore::getCompactPriority).min()<a name="line.8369"></a> -<span class="sourceLineNo">8370</span> .orElse(Store.NO_PRIORITY);<a name="line.8370"></a> -<span class="sourceLineNo">8371</span> }<a name="line.8371"></a> -<span class="sourceLineNo">8372</span><a name="line.8372"></a> -<span class="sourceLineNo">8373</span> /** @return the coprocessor host */<a name="line.8373"></a> -<span class="sourceLineNo">8374</span> public RegionCoprocessorHost getCoprocessorHost() {<a name="line.8374"></a> -<span class="sourceLineNo">8375</span> return coprocessorHost;<a name="line.8375"></a> -<span class="sourceLineNo">8376</span> }<a name="line.8376"></a> -<span class="sourceLineNo">8377</span><a name="line.8377"></a> -<span class="sourceLineNo">8378</span> /** @param coprocessorHost the new coprocessor host */<a name="line.8378"></a> -<span class="sourceLineNo">8379</span> @VisibleForTesting<a name="line.8379"></a> -<span class="sourceLineNo">8380</span> public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {<a name="line.8380"></a> -<span class="sourceLineNo">8381</span> this.coprocessorHost = coprocessorHost;<a name="line.8381"></a> -<span class="sourceLineNo">8382</span> }<a name="line.8382"></a> -<span class="sourceLineNo">8383</span><a name="line.8383"></a> -<span class="sourceLineNo">8384</span> @Override<a name="line.8384"></a> -<span class="sourceLineNo">8385</span> public void startRegionOperation() throws IOException {<a name="line.8385"></a> -<span class="sourceLineNo">8386</span> startRegionOperation(Operation.ANY);<a name="line.8386"></a> -<span class="sourceLineNo">8387</span> }<a name="line.8387"></a> -<span class="sourceLineNo">8388</span><a name="line.8388"></a> -<span class="sourceLineNo">8389</span> @Override<a name="line.8389"></a> -<span class="sourceLineNo">8390</span> public void startRegionOperation(Operation op) throws IOException {<a name="line.8390"></a> -<span class="sourceLineNo">8391</span> switch (op) {<a name="line.8391"></a> -<span class="sourceLineNo">8392</span> case GET: // read operations<a name="line.8392"></a> -<span class="sourceLineNo">8393</span> case SCAN:<a name="line.8393"></a> -<span class="sourceLineNo">8394</span> checkReadsEnabled();<a name="line.8394"></a> -<span class="sourceLineNo">8395</span> break;<a name="line.8395"></a> -<span class="sourceLineNo">8396</span> default:<a name="line.8396"></a> -<span class="sourceLineNo">8397</span> break;<a name="line.8397"></a> -<span class="sourceLineNo">8398</span> }<a name="line.8398"></a> -<span class="sourceLineNo">8399</span> if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION<a name="line.8399"></a> -<span class="sourceLineNo">8400</span> || op == Operation.COMPACT_REGION || op == Operation.COMPACT_SWITCH) {<a name="line.8400"></a> -<span class="sourceLineNo">8401</span> // split, merge or compact region doesn't need to check the closing/closed state or lock the<a name="line.8401"></a> -<span class="sourceLineNo">8402</span> // region<a name="line.8402"></a> -<span class="sourceLineNo">8403</span> return;<a name="line.8403"></a> +<span class="sourceLineNo">8365</span> /** @param coprocessorHost the new coprocessor host */<a name="line.8365"></a> +<span class="sourceLineNo">8366</span> @VisibleForTesting<a name="line.8366"></a> +<span class="sourceLineNo">8367</span> public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {<a name="line.8367"></a> +<span class="sourceLineNo">8368</span> this.coprocessorHost = coprocessorHost;<a name="line.8368"></a> +<span class="sourceLineNo">8369</span> }<a name="line.8369"></a> +<span class="sourceLineNo">8370</span><a name="line.8370"></a> +<span class="sourceLineNo">8371</span> @Override<a name="line.8371"></a> +<span class="sourceLineNo">8372</span> public void startRegionOperation() throws IOException {<a name="line.8372"></a> +<span class="sourceLineNo">8373</span> startRegionOperation(Operation.ANY);<a name="line.8373"></a> +<span class="sourceLineNo">8374</span> }<a name="line.8374"></a> +<span class="sourceLineNo">8375</span><a name="line.8375"></a> +<span class="sourceLineNo">8376</span> @Override<a name="line.8376"></a
<TRUNCATED>