http://git-wip-us.apache.org/repos/asf/hbase-site/blob/794df1af/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html index 690d0b7..2e312a0 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html @@ -6974,1186 +6974,1187 @@ <span class="sourceLineNo">6966</span> @Override<a name="line.6966"></a> <span class="sourceLineNo">6967</span> public void mutateRowsWithLocks(Collection<Mutation> mutations,<a name="line.6967"></a> <span class="sourceLineNo">6968</span> Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {<a name="line.6968"></a> -<span class="sourceLineNo">6969</span> MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);<a name="line.6969"></a> -<span class="sourceLineNo">6970</span> processRowsWithLocks(proc, -1, nonceGroup, nonce);<a name="line.6970"></a> -<span class="sourceLineNo">6971</span> }<a name="line.6971"></a> -<span class="sourceLineNo">6972</span><a name="line.6972"></a> -<span class="sourceLineNo">6973</span> /**<a name="line.6973"></a> -<span class="sourceLineNo">6974</span> * @return statistics about the current load of the region<a name="line.6974"></a> -<span class="sourceLineNo">6975</span> */<a name="line.6975"></a> -<span class="sourceLineNo">6976</span> public ClientProtos.RegionLoadStats getLoadStatistics() {<a name="line.6976"></a> -<span class="sourceLineNo">6977</span> if (!regionStatsEnabled) {<a name="line.6977"></a> -<span class="sourceLineNo">6978</span> return null;<a name="line.6978"></a> -<span class="sourceLineNo">6979</span> }<a name="line.6979"></a> -<span class="sourceLineNo">6980</span> ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();<a name="line.6980"></a> -<span class="sourceLineNo">6981</span> stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this<a name="line.6981"></a> -<span class="sourceLineNo">6982</span> .memstoreFlushSize)));<a name="line.6982"></a> -<span class="sourceLineNo">6983</span> if (rsServices.getHeapMemoryManager() != null) {<a name="line.6983"></a> -<span class="sourceLineNo">6984</span> // the HeapMemoryManager uses -0.0 to signal a problem asking the JVM,<a name="line.6984"></a> -<span class="sourceLineNo">6985</span> // so we could just do the calculation below and we'll get a 0.<a name="line.6985"></a> -<span class="sourceLineNo">6986</span> // treating it as a special case analogous to no HMM instead so that it can be<a name="line.6986"></a> -<span class="sourceLineNo">6987</span> // programatically treated different from using <1% of heap.<a name="line.6987"></a> -<span class="sourceLineNo">6988</span> final float occupancy = rsServices.getHeapMemoryManager().getHeapOccupancyPercent();<a name="line.6988"></a> -<span class="sourceLineNo">6989</span> if (occupancy != HeapMemoryManager.HEAP_OCCUPANCY_ERROR_VALUE) {<a name="line.6989"></a> -<span class="sourceLineNo">6990</span> stats.setHeapOccupancy((int)(occupancy * 100));<a name="line.6990"></a> -<span class="sourceLineNo">6991</span> }<a name="line.6991"></a> -<span class="sourceLineNo">6992</span> }<a name="line.6992"></a> -<span class="sourceLineNo">6993</span> stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 :<a name="line.6993"></a> -<span class="sourceLineNo">6994</span> (int)rsServices.getCompactionPressure()*100);<a name="line.6994"></a> -<span class="sourceLineNo">6995</span> return stats.build();<a name="line.6995"></a> -<span class="sourceLineNo">6996</span> }<a name="line.6996"></a> -<span class="sourceLineNo">6997</span><a name="line.6997"></a> -<span class="sourceLineNo">6998</span> @Override<a name="line.6998"></a> -<span class="sourceLineNo">6999</span> public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {<a name="line.6999"></a> -<span class="sourceLineNo">7000</span> processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,<a name="line.7000"></a> -<span class="sourceLineNo">7001</span> HConstants.NO_NONCE);<a name="line.7001"></a> -<span class="sourceLineNo">7002</span> }<a name="line.7002"></a> -<span class="sourceLineNo">7003</span><a name="line.7003"></a> -<span class="sourceLineNo">7004</span> @Override<a name="line.7004"></a> -<span class="sourceLineNo">7005</span> public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)<a name="line.7005"></a> -<span class="sourceLineNo">7006</span> throws IOException {<a name="line.7006"></a> -<span class="sourceLineNo">7007</span> processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);<a name="line.7007"></a> -<span class="sourceLineNo">7008</span> }<a name="line.7008"></a> -<span class="sourceLineNo">7009</span><a name="line.7009"></a> -<span class="sourceLineNo">7010</span> @Override<a name="line.7010"></a> -<span class="sourceLineNo">7011</span> public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,<a name="line.7011"></a> -<span class="sourceLineNo">7012</span> long nonceGroup, long nonce) throws IOException {<a name="line.7012"></a> -<span class="sourceLineNo">7013</span> for (byte[] row : processor.getRowsToLock()) {<a name="line.7013"></a> -<span class="sourceLineNo">7014</span> checkRow(row, "processRowsWithLocks");<a name="line.7014"></a> -<span class="sourceLineNo">7015</span> }<a name="line.7015"></a> -<span class="sourceLineNo">7016</span> if (!processor.readOnly()) {<a name="line.7016"></a> -<span class="sourceLineNo">7017</span> checkReadOnly();<a name="line.7017"></a> -<span class="sourceLineNo">7018</span> }<a name="line.7018"></a> -<span class="sourceLineNo">7019</span> checkResources();<a name="line.7019"></a> -<span class="sourceLineNo">7020</span> startRegionOperation();<a name="line.7020"></a> -<span class="sourceLineNo">7021</span> WALEdit walEdit = new WALEdit();<a name="line.7021"></a> -<span class="sourceLineNo">7022</span><a name="line.7022"></a> -<span class="sourceLineNo">7023</span> // STEP 1. Run pre-process hook<a name="line.7023"></a> -<span class="sourceLineNo">7024</span> preProcess(processor, walEdit);<a name="line.7024"></a> -<span class="sourceLineNo">7025</span> // Short circuit the read only case<a name="line.7025"></a> -<span class="sourceLineNo">7026</span> if (processor.readOnly()) {<a name="line.7026"></a> -<span class="sourceLineNo">7027</span> try {<a name="line.7027"></a> -<span class="sourceLineNo">7028</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7028"></a> -<span class="sourceLineNo">7029</span> doProcessRowWithTimeout(processor, now, this, null, null, timeout);<a name="line.7029"></a> -<span class="sourceLineNo">7030</span> processor.postProcess(this, walEdit, true);<a name="line.7030"></a> -<span class="sourceLineNo">7031</span> } finally {<a name="line.7031"></a> -<span class="sourceLineNo">7032</span> closeRegionOperation();<a name="line.7032"></a> -<span class="sourceLineNo">7033</span> }<a name="line.7033"></a> -<span class="sourceLineNo">7034</span> return;<a name="line.7034"></a> -<span class="sourceLineNo">7035</span> }<a name="line.7035"></a> -<span class="sourceLineNo">7036</span><a name="line.7036"></a> -<span class="sourceLineNo">7037</span> boolean locked = false;<a name="line.7037"></a> -<span class="sourceLineNo">7038</span> List<RowLock> acquiredRowLocks = null;<a name="line.7038"></a> -<span class="sourceLineNo">7039</span> List<Mutation> mutations = new ArrayList<>();<a name="line.7039"></a> -<span class="sourceLineNo">7040</span> Collection<byte[]> rowsToLock = processor.getRowsToLock();<a name="line.7040"></a> -<span class="sourceLineNo">7041</span> // This is assigned by mvcc either explicity in the below or in the guts of the WAL append<a name="line.7041"></a> -<span class="sourceLineNo">7042</span> // when it assigns the edit a sequencedid (A.K.A the mvcc write number).<a name="line.7042"></a> -<span class="sourceLineNo">7043</span> WriteEntry writeEntry = null;<a name="line.7043"></a> -<span class="sourceLineNo">7044</span> MemstoreSize memstoreSize = new MemstoreSize();<a name="line.7044"></a> -<span class="sourceLineNo">7045</span> try {<a name="line.7045"></a> -<span class="sourceLineNo">7046</span> boolean success = false;<a name="line.7046"></a> -<span class="sourceLineNo">7047</span> try {<a name="line.7047"></a> -<span class="sourceLineNo">7048</span> // STEP 2. Acquire the row lock(s)<a name="line.7048"></a> -<span class="sourceLineNo">7049</span> acquiredRowLocks = new ArrayList<>(rowsToLock.size());<a name="line.7049"></a> -<span class="sourceLineNo">7050</span> for (byte[] row : rowsToLock) {<a name="line.7050"></a> -<span class="sourceLineNo">7051</span> // Attempt to lock all involved rows, throw if any lock times out<a name="line.7051"></a> -<span class="sourceLineNo">7052</span> // use a writer lock for mixed reads and writes<a name="line.7052"></a> -<span class="sourceLineNo">7053</span> acquiredRowLocks.add(getRowLockInternal(row, false));<a name="line.7053"></a> -<span class="sourceLineNo">7054</span> }<a name="line.7054"></a> -<span class="sourceLineNo">7055</span> // STEP 3. Region lock<a name="line.7055"></a> -<span class="sourceLineNo">7056</span> lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());<a name="line.7056"></a> -<span class="sourceLineNo">7057</span> locked = true;<a name="line.7057"></a> -<span class="sourceLineNo">7058</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7058"></a> -<span class="sourceLineNo">7059</span> // STEP 4. Let the processor scan the rows, generate mutations and add waledits<a name="line.7059"></a> -<span class="sourceLineNo">7060</span> doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);<a name="line.7060"></a> -<span class="sourceLineNo">7061</span> if (!mutations.isEmpty()) {<a name="line.7061"></a> -<span class="sourceLineNo">7062</span> // STEP 5. Call the preBatchMutate hook<a name="line.7062"></a> -<span class="sourceLineNo">7063</span> processor.preBatchMutate(this, walEdit);<a name="line.7063"></a> -<span class="sourceLineNo">7064</span><a name="line.7064"></a> -<span class="sourceLineNo">7065</span> // STEP 6. Append and sync if walEdit has data to write out.<a name="line.7065"></a> -<span class="sourceLineNo">7066</span> if (!walEdit.isEmpty()) {<a name="line.7066"></a> -<span class="sourceLineNo">7067</span> writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()),<a name="line.7067"></a> -<span class="sourceLineNo">7068</span> processor.getClusterIds(), now, nonceGroup, nonce);<a name="line.7068"></a> -<span class="sourceLineNo">7069</span> } else {<a name="line.7069"></a> -<span class="sourceLineNo">7070</span> // We are here if WAL is being skipped.<a name="line.7070"></a> -<span class="sourceLineNo">7071</span> writeEntry = this.mvcc.begin();<a name="line.7071"></a> -<span class="sourceLineNo">7072</span> }<a name="line.7072"></a> -<span class="sourceLineNo">7073</span><a name="line.7073"></a> -<span class="sourceLineNo">7074</span> // STEP 7. Apply to memstore<a name="line.7074"></a> -<span class="sourceLineNo">7075</span> long sequenceId = writeEntry.getWriteNumber();<a name="line.7075"></a> -<span class="sourceLineNo">7076</span> for (Mutation m : mutations) {<a name="line.7076"></a> -<span class="sourceLineNo">7077</span> // Handle any tag based cell features.<a name="line.7077"></a> -<span class="sourceLineNo">7078</span> // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before<a name="line.7078"></a> -<span class="sourceLineNo">7079</span> // so tags go into WAL?<a name="line.7079"></a> -<span class="sourceLineNo">7080</span> rewriteCellTags(m.getFamilyCellMap(), m);<a name="line.7080"></a> -<span class="sourceLineNo">7081</span> for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {<a name="line.7081"></a> -<span class="sourceLineNo">7082</span> Cell cell = cellScanner.current();<a name="line.7082"></a> -<span class="sourceLineNo">7083</span> if (walEdit.isEmpty()) {<a name="line.7083"></a> -<span class="sourceLineNo">7084</span> // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id.<a name="line.7084"></a> -<span class="sourceLineNo">7085</span> // If no WAL, need to stamp it here.<a name="line.7085"></a> -<span class="sourceLineNo">7086</span> CellUtil.setSequenceId(cell, sequenceId);<a name="line.7086"></a> -<span class="sourceLineNo">7087</span> }<a name="line.7087"></a> -<span class="sourceLineNo">7088</span> applyToMemstore(getHStore(cell), cell, memstoreSize);<a name="line.7088"></a> -<span class="sourceLineNo">7089</span> }<a name="line.7089"></a> -<span class="sourceLineNo">7090</span> }<a name="line.7090"></a> -<span class="sourceLineNo">7091</span><a name="line.7091"></a> -<span class="sourceLineNo">7092</span> // STEP 8. call postBatchMutate hook<a name="line.7092"></a> -<span class="sourceLineNo">7093</span> processor.postBatchMutate(this);<a name="line.7093"></a> -<span class="sourceLineNo">7094</span><a name="line.7094"></a> -<span class="sourceLineNo">7095</span> // STEP 9. Complete mvcc.<a name="line.7095"></a> -<span class="sourceLineNo">7096</span> mvcc.completeAndWait(writeEntry);<a name="line.7096"></a> -<span class="sourceLineNo">7097</span> writeEntry = null;<a name="line.7097"></a> -<span class="sourceLineNo">7098</span><a name="line.7098"></a> -<span class="sourceLineNo">7099</span> // STEP 10. Release region lock<a name="line.7099"></a> -<span class="sourceLineNo">7100</span> if (locked) {<a name="line.7100"></a> -<span class="sourceLineNo">7101</span> this.updatesLock.readLock().unlock();<a name="line.7101"></a> -<span class="sourceLineNo">7102</span> locked = false;<a name="line.7102"></a> -<span class="sourceLineNo">7103</span> }<a name="line.7103"></a> -<span class="sourceLineNo">7104</span><a name="line.7104"></a> -<span class="sourceLineNo">7105</span> // STEP 11. Release row lock(s)<a name="line.7105"></a> -<span class="sourceLineNo">7106</span> releaseRowLocks(acquiredRowLocks);<a name="line.7106"></a> -<span class="sourceLineNo">7107</span> }<a name="line.7107"></a> -<span class="sourceLineNo">7108</span> success = true;<a name="line.7108"></a> -<span class="sourceLineNo">7109</span> } finally {<a name="line.7109"></a> -<span class="sourceLineNo">7110</span> // Call complete rather than completeAndWait because we probably had error if walKey != null<a name="line.7110"></a> -<span class="sourceLineNo">7111</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.7111"></a> -<span class="sourceLineNo">7112</span> if (locked) {<a name="line.7112"></a> -<span class="sourceLineNo">7113</span> this.updatesLock.readLock().unlock();<a name="line.7113"></a> -<span class="sourceLineNo">7114</span> }<a name="line.7114"></a> -<span class="sourceLineNo">7115</span> // release locks if some were acquired but another timed out<a name="line.7115"></a> -<span class="sourceLineNo">7116</span> releaseRowLocks(acquiredRowLocks);<a name="line.7116"></a> -<span class="sourceLineNo">7117</span> }<a name="line.7117"></a> -<span class="sourceLineNo">7118</span><a name="line.7118"></a> -<span class="sourceLineNo">7119</span> // 12. Run post-process hook<a name="line.7119"></a> -<span class="sourceLineNo">7120</span> processor.postProcess(this, walEdit, success);<a name="line.7120"></a> -<span class="sourceLineNo">7121</span> } finally {<a name="line.7121"></a> -<span class="sourceLineNo">7122</span> closeRegionOperation();<a name="line.7122"></a> -<span class="sourceLineNo">7123</span> if (!mutations.isEmpty()) {<a name="line.7123"></a> -<span class="sourceLineNo">7124</span> long newSize = this.addAndGetMemstoreSize(memstoreSize);<a name="line.7124"></a> -<span class="sourceLineNo">7125</span> requestFlushIfNeeded(newSize);<a name="line.7125"></a> -<span class="sourceLineNo">7126</span> }<a name="line.7126"></a> -<span class="sourceLineNo">7127</span> }<a name="line.7127"></a> -<span class="sourceLineNo">7128</span> }<a name="line.7128"></a> -<span class="sourceLineNo">7129</span><a name="line.7129"></a> -<span class="sourceLineNo">7130</span> private void preProcess(final RowProcessor<?,?> processor, final WALEdit walEdit)<a name="line.7130"></a> -<span class="sourceLineNo">7131</span> throws IOException {<a name="line.7131"></a> -<span class="sourceLineNo">7132</span> try {<a name="line.7132"></a> -<span class="sourceLineNo">7133</span> processor.preProcess(this, walEdit);<a name="line.7133"></a> -<span class="sourceLineNo">7134</span> } catch (IOException e) {<a name="line.7134"></a> -<span class="sourceLineNo">7135</span> closeRegionOperation();<a name="line.7135"></a> -<span class="sourceLineNo">7136</span> throw e;<a name="line.7136"></a> -<span class="sourceLineNo">7137</span> }<a name="line.7137"></a> -<span class="sourceLineNo">7138</span> }<a name="line.7138"></a> -<span class="sourceLineNo">7139</span><a name="line.7139"></a> -<span class="sourceLineNo">7140</span> private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,<a name="line.7140"></a> -<span class="sourceLineNo">7141</span> final long now,<a name="line.7141"></a> -<span class="sourceLineNo">7142</span> final HRegion region,<a name="line.7142"></a> -<span class="sourceLineNo">7143</span> final List<Mutation> mutations,<a name="line.7143"></a> -<span class="sourceLineNo">7144</span> final WALEdit walEdit,<a name="line.7144"></a> -<span class="sourceLineNo">7145</span> final long timeout) throws IOException {<a name="line.7145"></a> -<span class="sourceLineNo">7146</span> // Short circuit the no time bound case.<a name="line.7146"></a> -<span class="sourceLineNo">7147</span> if (timeout < 0) {<a name="line.7147"></a> -<span class="sourceLineNo">7148</span> try {<a name="line.7148"></a> -<span class="sourceLineNo">7149</span> processor.process(now, region, mutations, walEdit);<a name="line.7149"></a> -<span class="sourceLineNo">7150</span> } catch (IOException e) {<a name="line.7150"></a> -<span class="sourceLineNo">7151</span> LOG.warn("RowProcessor:" + processor.getClass().getName() +<a name="line.7151"></a> -<span class="sourceLineNo">7152</span> " throws Exception on row(s):" +<a name="line.7152"></a> -<span class="sourceLineNo">7153</span> Bytes.toStringBinary(<a name="line.7153"></a> -<span class="sourceLineNo">7154</span> processor.getRowsToLock().iterator().next()) + "...", e);<a name="line.7154"></a> -<span class="sourceLineNo">7155</span> throw e;<a name="line.7155"></a> -<span class="sourceLineNo">7156</span> }<a name="line.7156"></a> -<span class="sourceLineNo">7157</span> return;<a name="line.7157"></a> -<span class="sourceLineNo">7158</span> }<a name="line.7158"></a> -<span class="sourceLineNo">7159</span><a name="line.7159"></a> -<span class="sourceLineNo">7160</span> // Case with time bound<a name="line.7160"></a> -<span class="sourceLineNo">7161</span> FutureTask<Void> task = new FutureTask<>(new Callable<Void>() {<a name="line.7161"></a> -<span class="sourceLineNo">7162</span> @Override<a name="line.7162"></a> -<span class="sourceLineNo">7163</span> public Void call() throws IOException {<a name="line.7163"></a> -<span class="sourceLineNo">7164</span> try {<a name="line.7164"></a> -<span class="sourceLineNo">7165</span> processor.process(now, region, mutations, walEdit);<a name="line.7165"></a> -<span class="sourceLineNo">7166</span> return null;<a name="line.7166"></a> -<span class="sourceLineNo">7167</span> } catch (IOException e) {<a name="line.7167"></a> -<span class="sourceLineNo">7168</span> LOG.warn("RowProcessor:" + processor.getClass().getName() +<a name="line.7168"></a> -<span class="sourceLineNo">7169</span> " throws Exception on row(s):" +<a name="line.7169"></a> -<span class="sourceLineNo">7170</span> Bytes.toStringBinary(<a name="line.7170"></a> -<span class="sourceLineNo">7171</span> processor.getRowsToLock().iterator().next()) + "...", e);<a name="line.7171"></a> -<span class="sourceLineNo">7172</span> throw e;<a name="line.7172"></a> -<span class="sourceLineNo">7173</span> }<a name="line.7173"></a> -<span class="sourceLineNo">7174</span> }<a name="line.7174"></a> -<span class="sourceLineNo">7175</span> });<a name="line.7175"></a> -<span class="sourceLineNo">7176</span> rowProcessorExecutor.execute(task);<a name="line.7176"></a> -<span class="sourceLineNo">7177</span> try {<a name="line.7177"></a> -<span class="sourceLineNo">7178</span> task.get(timeout, TimeUnit.MILLISECONDS);<a name="line.7178"></a> -<span class="sourceLineNo">7179</span> } catch (TimeoutException te) {<a name="line.7179"></a> -<span class="sourceLineNo">7180</span> LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +<a name="line.7180"></a> -<span class="sourceLineNo">7181</span> Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +<a name="line.7181"></a> -<span class="sourceLineNo">7182</span> "...");<a name="line.7182"></a> -<span class="sourceLineNo">7183</span> throw new IOException(te);<a name="line.7183"></a> -<span class="sourceLineNo">7184</span> } catch (Exception e) {<a name="line.7184"></a> -<span class="sourceLineNo">7185</span> throw new IOException(e);<a name="line.7185"></a> -<span class="sourceLineNo">7186</span> }<a name="line.7186"></a> -<span class="sourceLineNo">7187</span> }<a name="line.7187"></a> -<span class="sourceLineNo">7188</span><a name="line.7188"></a> -<span class="sourceLineNo">7189</span> public Result append(Append append) throws IOException {<a name="line.7189"></a> -<span class="sourceLineNo">7190</span> return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.7190"></a> -<span class="sourceLineNo">7191</span> }<a name="line.7191"></a> -<span class="sourceLineNo">7192</span><a name="line.7192"></a> -<span class="sourceLineNo">7193</span> @Override<a name="line.7193"></a> -<span class="sourceLineNo">7194</span> public Result append(Append mutation, long nonceGroup, long nonce) throws IOException {<a name="line.7194"></a> -<span class="sourceLineNo">7195</span> return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults());<a name="line.7195"></a> -<span class="sourceLineNo">7196</span> }<a name="line.7196"></a> -<span class="sourceLineNo">7197</span><a name="line.7197"></a> -<span class="sourceLineNo">7198</span> public Result increment(Increment increment) throws IOException {<a name="line.7198"></a> -<span class="sourceLineNo">7199</span> return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.7199"></a> -<span class="sourceLineNo">7200</span> }<a name="line.7200"></a> -<span class="sourceLineNo">7201</span><a name="line.7201"></a> -<span class="sourceLineNo">7202</span> @Override<a name="line.7202"></a> -<span class="sourceLineNo">7203</span> public Result increment(Increment mutation, long nonceGroup, long nonce)<a name="line.7203"></a> -<span class="sourceLineNo">7204</span> throws IOException {<a name="line.7204"></a> -<span class="sourceLineNo">7205</span> return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults());<a name="line.7205"></a> -<span class="sourceLineNo">7206</span> }<a name="line.7206"></a> -<span class="sourceLineNo">7207</span><a name="line.7207"></a> -<span class="sourceLineNo">7208</span> /**<a name="line.7208"></a> -<span class="sourceLineNo">7209</span> * Add "deltas" to Cells. Deltas are increments or appends. Switch on <code>op</code>.<a name="line.7209"></a> -<span class="sourceLineNo">7210</span> *<a name="line.7210"></a> -<span class="sourceLineNo">7211</span> * <p>If increment, add deltas to current values or if an append, then<a name="line.7211"></a> -<span class="sourceLineNo">7212</span> * append the deltas to the current Cell values.<a name="line.7212"></a> -<span class="sourceLineNo">7213</span> *<a name="line.7213"></a> -<span class="sourceLineNo">7214</span> * <p>Append and Increment code paths are mostly the same. They differ in just a few places.<a name="line.7214"></a> -<span class="sourceLineNo">7215</span> * This method does the code path for increment and append and then in key spots, switches<a name="line.7215"></a> -<span class="sourceLineNo">7216</span> * on the passed in <code>op</code> to do increment or append specific paths.<a name="line.7216"></a> -<span class="sourceLineNo">7217</span> */<a name="line.7217"></a> -<span class="sourceLineNo">7218</span> private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long nonce,<a name="line.7218"></a> -<span class="sourceLineNo">7219</span> boolean returnResults) throws IOException {<a name="line.7219"></a> -<span class="sourceLineNo">7220</span> checkReadOnly();<a name="line.7220"></a> -<span class="sourceLineNo">7221</span> checkResources();<a name="line.7221"></a> -<span class="sourceLineNo">7222</span> checkRow(mutation.getRow(), op.toString());<a name="line.7222"></a> -<span class="sourceLineNo">7223</span> checkFamilies(mutation.getFamilyCellMap().keySet());<a name="line.7223"></a> -<span class="sourceLineNo">7224</span> this.writeRequestsCount.increment();<a name="line.7224"></a> -<span class="sourceLineNo">7225</span> WriteEntry writeEntry = null;<a name="line.7225"></a> -<span class="sourceLineNo">7226</span> startRegionOperation(op);<a name="line.7226"></a> -<span class="sourceLineNo">7227</span> List<Cell> results = returnResults? new ArrayList<>(mutation.size()): null;<a name="line.7227"></a> -<span class="sourceLineNo">7228</span> RowLock rowLock = null;<a name="line.7228"></a> -<span class="sourceLineNo">7229</span> MemstoreSize memstoreSize = new MemstoreSize();<a name="line.7229"></a> -<span class="sourceLineNo">7230</span> try {<a name="line.7230"></a> -<span class="sourceLineNo">7231</span> rowLock = getRowLockInternal(mutation.getRow(), false);<a name="line.7231"></a> -<span class="sourceLineNo">7232</span> lock(this.updatesLock.readLock());<a name="line.7232"></a> -<span class="sourceLineNo">7233</span> try {<a name="line.7233"></a> -<span class="sourceLineNo">7234</span> Result cpResult = doCoprocessorPreCall(op, mutation);<a name="line.7234"></a> -<span class="sourceLineNo">7235</span> if (cpResult != null) {<a name="line.7235"></a> -<span class="sourceLineNo">7236</span> return returnResults? cpResult: null;<a name="line.7236"></a> -<span class="sourceLineNo">7237</span> }<a name="line.7237"></a> -<span class="sourceLineNo">7238</span> Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());<a name="line.7238"></a> -<span class="sourceLineNo">7239</span> Map<Store, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());<a name="line.7239"></a> -<span class="sourceLineNo">7240</span> // Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and<a name="line.7240"></a> -<span class="sourceLineNo">7241</span> // what to return back to the client (in 'forMemStore' and 'results' respectively).<a name="line.7241"></a> -<span class="sourceLineNo">7242</span> WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);<a name="line.7242"></a> -<span class="sourceLineNo">7243</span> // Actually write to WAL now if a walEdit to apply.<a name="line.7243"></a> -<span class="sourceLineNo">7244</span> if (walEdit != null && !walEdit.isEmpty()) {<a name="line.7244"></a> -<span class="sourceLineNo">7245</span> writeEntry = doWALAppend(walEdit, effectiveDurability, nonceGroup, nonce);<a name="line.7245"></a> -<span class="sourceLineNo">7246</span> } else {<a name="line.7246"></a> -<span class="sourceLineNo">7247</span> // If walEdits is empty, it means we skipped the WAL; update LongAdders and start an mvcc<a name="line.7247"></a> -<span class="sourceLineNo">7248</span> // transaction.<a name="line.7248"></a> -<span class="sourceLineNo">7249</span> recordMutationWithoutWal(mutation.getFamilyCellMap());<a name="line.7249"></a> -<span class="sourceLineNo">7250</span> writeEntry = mvcc.begin();<a name="line.7250"></a> -<span class="sourceLineNo">7251</span> updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());<a name="line.7251"></a> -<span class="sourceLineNo">7252</span> }<a name="line.7252"></a> -<span class="sourceLineNo">7253</span> // Now write to MemStore. Do it a column family at a time.<a name="line.7253"></a> -<span class="sourceLineNo">7254</span> for (Map.Entry<Store, List<Cell>> e : forMemStore.entrySet()) {<a name="line.7254"></a> -<span class="sourceLineNo">7255</span> applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize);<a name="line.7255"></a> -<span class="sourceLineNo">7256</span> }<a name="line.7256"></a> -<span class="sourceLineNo">7257</span> mvcc.completeAndWait(writeEntry);<a name="line.7257"></a> -<span class="sourceLineNo">7258</span> if (rsServices != null && rsServices.getNonceManager() != null) {<a name="line.7258"></a> -<span class="sourceLineNo">7259</span> rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, nonce,<a name="line.7259"></a> -<span class="sourceLineNo">7260</span> writeEntry.getWriteNumber());<a name="line.7260"></a> -<span class="sourceLineNo">7261</span> }<a name="line.7261"></a> -<span class="sourceLineNo">7262</span> writeEntry = null;<a name="line.7262"></a> -<span class="sourceLineNo">7263</span> } finally {<a name="line.7263"></a> -<span class="sourceLineNo">7264</span> this.updatesLock.readLock().unlock();<a name="line.7264"></a> -<span class="sourceLineNo">7265</span> }<a name="line.7265"></a> -<span class="sourceLineNo">7266</span> // If results is null, then client asked that we not return the calculated results.<a name="line.7266"></a> -<span class="sourceLineNo">7267</span> return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT;<a name="line.7267"></a> -<span class="sourceLineNo">7268</span> } finally {<a name="line.7268"></a> -<span class="sourceLineNo">7269</span> // Call complete always, even on success. doDelta is doing a Get READ_UNCOMMITTED when it goes<a name="line.7269"></a> -<span class="sourceLineNo">7270</span> // to get current value under an exclusive lock so no need so no need to wait to return to<a name="line.7270"></a> -<span class="sourceLineNo">7271</span> // the client. Means only way to read-your-own-increment or append is to come in with an<a name="line.7271"></a> -<span class="sourceLineNo">7272</span> // a 0 increment.<a name="line.7272"></a> -<span class="sourceLineNo">7273</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.7273"></a> -<span class="sourceLineNo">7274</span> if (rowLock != null) {<a name="line.7274"></a> -<span class="sourceLineNo">7275</span> rowLock.release();<a name="line.7275"></a> -<span class="sourceLineNo">7276</span> }<a name="line.7276"></a> -<span class="sourceLineNo">7277</span> // Request a cache flush if over the limit. Do it outside update lock.<a name="line.7277"></a> -<span class="sourceLineNo">7278</span> if (isFlushSize(addAndGetMemstoreSize(memstoreSize))) {<a name="line.7278"></a> -<span class="sourceLineNo">7279</span> requestFlush();<a name="line.7279"></a> -<span class="sourceLineNo">7280</span> }<a name="line.7280"></a> -<span class="sourceLineNo">7281</span> closeRegionOperation(op);<a name="line.7281"></a> -<span class="sourceLineNo">7282</span> if (this.metricsRegion != null) {<a name="line.7282"></a> -<span class="sourceLineNo">7283</span> switch (op) {<a name="line.7283"></a> -<span class="sourceLineNo">7284</span> case INCREMENT:<a name="line.7284"></a> -<span class="sourceLineNo">7285</span> this.metricsRegion.updateIncrement();<a name="line.7285"></a> -<span class="sourceLineNo">7286</span> break;<a name="line.7286"></a> -<span class="sourceLineNo">7287</span> case APPEND:<a name="line.7287"></a> -<span class="sourceLineNo">7288</span> this.metricsRegion.updateAppend();<a name="line.7288"></a> -<span class="sourceLineNo">7289</span> break;<a name="line.7289"></a> -<span class="sourceLineNo">7290</span> default:<a name="line.7290"></a> -<span class="sourceLineNo">7291</span> break;<a name="line.7291"></a> -<span class="sourceLineNo">7292</span> }<a name="line.7292"></a> -<span class="sourceLineNo">7293</span> }<a name="line.7293"></a> -<span class="sourceLineNo">7294</span> }<a name="line.7294"></a> -<span class="sourceLineNo">7295</span> }<a name="line.7295"></a> -<span class="sourceLineNo">7296</span><a name="line.7296"></a> -<span class="sourceLineNo">7297</span> private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, long nonceGroup,<a name="line.7297"></a> -<span class="sourceLineNo">7298</span> long nonce)<a name="line.7298"></a> -<span class="sourceLineNo">7299</span> throws IOException {<a name="line.7299"></a> -<span class="sourceLineNo">7300</span> return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(),<a name="line.7300"></a> -<span class="sourceLineNo">7301</span> nonceGroup, nonce);<a name="line.7301"></a> -<span class="sourceLineNo">7302</span> }<a name="line.7302"></a> -<span class="sourceLineNo">7303</span><a name="line.7303"></a> -<span class="sourceLineNo">7304</span> /**<a name="line.7304"></a> -<span class="sourceLineNo">7305</span> * @return writeEntry associated with this append<a name="line.7305"></a> -<span class="sourceLineNo">7306</span> */<a name="line.7306"></a> -<span class="sourceLineNo">7307</span> private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,<a name="line.7307"></a> -<span class="sourceLineNo">7308</span> long now, long nonceGroup, long nonce)<a name="line.7308"></a> -<span class="sourceLineNo">7309</span> throws IOException {<a name="line.7309"></a> -<span class="sourceLineNo">7310</span> WriteEntry writeEntry = null;<a name="line.7310"></a> -<span class="sourceLineNo">7311</span> // Using default cluster id, as this can only happen in the originating cluster.<a name="line.7311"></a> -<span class="sourceLineNo">7312</span> // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey<a name="line.7312"></a> -<span class="sourceLineNo">7313</span> // here instead of WALKey directly to support legacy coprocessors.<a name="line.7313"></a> -<span class="sourceLineNo">7314</span> WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.7314"></a> -<span class="sourceLineNo">7315</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,<a name="line.7315"></a> -<span class="sourceLineNo">7316</span> nonceGroup, nonce, mvcc, this.getReplicationScope());<a name="line.7316"></a> -<span class="sourceLineNo">7317</span> try {<a name="line.7317"></a> -<span class="sourceLineNo">7318</span> long txid =<a name="line.7318"></a> -<span class="sourceLineNo">7319</span> this.wal.append(this.getRegionInfo(), walKey, walEdit, true);<a name="line.7319"></a> -<span class="sourceLineNo">7320</span> // Call sync on our edit.<a name="line.7320"></a> -<span class="sourceLineNo">7321</span> if (txid != 0) sync(txid, durability);<a name="line.7321"></a> -<span class="sourceLineNo">7322</span> writeEntry = walKey.getWriteEntry();<a name="line.7322"></a> -<span class="sourceLineNo">7323</span> } catch (IOException ioe) {<a name="line.7323"></a> -<span class="sourceLineNo">7324</span> if (walKey != null) mvcc.complete(walKey.getWriteEntry());<a name="line.7324"></a> -<span class="sourceLineNo">7325</span> throw ioe;<a name="line.7325"></a> -<span class="sourceLineNo">7326</span> }<a name="line.7326"></a> -<span class="sourceLineNo">7327</span> return writeEntry;<a name="line.7327"></a> -<span class="sourceLineNo">7328</span> }<a name="line.7328"></a> -<span class="sourceLineNo">7329</span><a name="line.7329"></a> -<span class="sourceLineNo">7330</span> /**<a name="line.7330"></a> -<span class="sourceLineNo">7331</span> * Do coprocessor pre-increment or pre-append call.<a name="line.7331"></a> -<span class="sourceLineNo">7332</span> * @return Result returned out of the coprocessor, which means bypass all further processing and<a name="line.7332"></a> -<span class="sourceLineNo">7333</span> * return the proffered Result instead, or null which means proceed.<a name="line.7333"></a> -<span class="sourceLineNo">7334</span> */<a name="line.7334"></a> -<span class="sourceLineNo">7335</span> private Result doCoprocessorPreCall(final Operation op, final Mutation mutation)<a name="line.7335"></a> -<span class="sourceLineNo">7336</span> throws IOException {<a name="line.7336"></a> -<span class="sourceLineNo">7337</span> Result result = null;<a name="line.7337"></a> -<span class="sourceLineNo">7338</span> if (this.coprocessorHost != null) {<a name="line.7338"></a> -<span class="sourceLineNo">7339</span> switch(op) {<a name="line.7339"></a> -<span class="sourceLineNo">7340</span> case INCREMENT:<a name="line.7340"></a> -<span class="sourceLineNo">7341</span> result = this.coprocessorHost.preIncrementAfterRowLock((Increment)mutation);<a name="line.7341"></a> -<span class="sourceLineNo">7342</span> break;<a name="line.7342"></a> -<span class="sourceLineNo">7343</span> case APPEND:<a name="line.7343"></a> -<span class="sourceLineNo">7344</span> result = this.coprocessorHost.preAppendAfterRowLock((Append)mutation);<a name="line.7344"></a> -<span class="sourceLineNo">7345</span> break;<a name="line.7345"></a> -<span class="sourceLineNo">7346</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.7346"></a> -<span class="sourceLineNo">7347</span> }<a name="line.7347"></a> -<span class="sourceLineNo">7348</span> }<a name="line.7348"></a> -<span class="sourceLineNo">7349</span> return result;<a name="line.7349"></a> -<span class="sourceLineNo">7350</span> }<a name="line.7350"></a> -<span class="sourceLineNo">7351</span><a name="line.7351"></a> -<span class="sourceLineNo">7352</span> /**<a name="line.7352"></a> -<span class="sourceLineNo">7353</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client; these Sets are not<a name="line.7353"></a> -<span class="sourceLineNo">7354</span> * always the same dependent on whether to write WAL or if the amount to increment is zero (in<a name="line.7354"></a> -<span class="sourceLineNo">7355</span> * this case we write back nothing, just return latest Cell value to the client).<a name="line.7355"></a> -<span class="sourceLineNo">7356</span> *<a name="line.7356"></a> -<span class="sourceLineNo">7357</span> * @param results Fill in here what goes back to the Client if it is non-null (if null, client<a name="line.7357"></a> -<span class="sourceLineNo">7358</span> * doesn't want results).<a name="line.7358"></a> -<span class="sourceLineNo">7359</span> * @param forMemStore Fill in here what to apply to the MemStore (by Store).<a name="line.7359"></a> -<span class="sourceLineNo">7360</span> * @return A WALEdit to apply to WAL or null if we are to skip the WAL.<a name="line.7360"></a> -<span class="sourceLineNo">7361</span> */<a name="line.7361"></a> -<span class="sourceLineNo">7362</span> private WALEdit reckonDeltas(final Operation op, final Mutation mutation,<a name="line.7362"></a> -<span class="sourceLineNo">7363</span> final Durability effectiveDurability, final Map<Store, List<Cell>> forMemStore,<a name="line.7363"></a> -<span class="sourceLineNo">7364</span> final List<Cell> results)<a name="line.7364"></a> -<span class="sourceLineNo">7365</span> throws IOException {<a name="line.7365"></a> -<span class="sourceLineNo">7366</span> WALEdit walEdit = null;<a name="line.7366"></a> -<span class="sourceLineNo">7367</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7367"></a> -<span class="sourceLineNo">7368</span> final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;<a name="line.7368"></a> -<span class="sourceLineNo">7369</span> // Process a Store/family at a time.<a name="line.7369"></a> -<span class="sourceLineNo">7370</span> for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {<a name="line.7370"></a> -<span class="sourceLineNo">7371</span> final byte [] columnFamilyName = entry.getKey();<a name="line.7371"></a> -<span class="sourceLineNo">7372</span> List<Cell> deltas = entry.getValue();<a name="line.7372"></a> -<span class="sourceLineNo">7373</span> Store store = this.stores.get(columnFamilyName);<a name="line.7373"></a> -<span class="sourceLineNo">7374</span> // Reckon for the Store what to apply to WAL and MemStore.<a name="line.7374"></a> -<span class="sourceLineNo">7375</span> List<Cell> toApply =<a name="line.7375"></a> -<span class="sourceLineNo">7376</span> reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);<a name="line.7376"></a> -<span class="sourceLineNo">7377</span> if (!toApply.isEmpty()) {<a name="line.7377"></a> -<span class="sourceLineNo">7378</span> forMemStore.put(store, toApply);<a name="line.7378"></a> -<span class="sourceLineNo">7379</span> if (writeToWAL) {<a name="line.7379"></a> -<span class="sourceLineNo">7380</span> if (walEdit == null) {<a name="line.7380"></a> -<span class="sourceLineNo">7381</span> walEdit = new WALEdit();<a name="line.7381"></a> -<span class="sourceLineNo">7382</span> }<a name="line.7382"></a> -<span class="sourceLineNo">7383</span> walEdit.getCells().addAll(toApply);<a name="line.7383"></a> -<span class="sourceLineNo">7384</span> }<a name="line.7384"></a> -<span class="sourceLineNo">7385</span> }<a name="line.7385"></a> -<span class="sourceLineNo">7386</span> }<a name="line.7386"></a> -<span class="sourceLineNo">7387</span> return walEdit;<a name="line.7387"></a> -<span class="sourceLineNo">7388</span> }<a name="line.7388"></a> -<span class="sourceLineNo">7389</span><a name="line.7389"></a> -<span class="sourceLineNo">7390</span> /**<a name="line.7390"></a> -<span class="sourceLineNo">7391</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed<a name="line.7391"></a> -<span class="sourceLineNo">7392</span> * column family/Store.<a name="line.7392"></a> -<span class="sourceLineNo">7393</span> *<a name="line.7393"></a> -<span class="sourceLineNo">7394</span> * Does Get of current value and then adds passed in deltas for this Store returning the result.<a name="line.7394"></a> -<span class="sourceLineNo">7395</span> *<a name="line.7395"></a> -<span class="sourceLineNo">7396</span> * @param op Whether Increment or Append<a name="line.7396"></a> -<span class="sourceLineNo">7397</span> * @param mutation The encompassing Mutation object<a name="line.7397"></a> -<span class="sourceLineNo">7398</span> * @param deltas Changes to apply to this Store; either increment amount or data to append<a name="line.7398"></a> -<span class="sourceLineNo">7399</span> * @param results In here we accumulate all the Cells we are to return to the client; this List<a name="line.7399"></a> -<span class="sourceLineNo">7400</span> * can be larger than what we return in case where delta is zero; i.e. don't write<a name="line.7400"></a> -<span class="sourceLineNo">7401</span> * out new values, just return current value. If null, client doesn't want results returned.<a name="line.7401"></a> -<span class="sourceLineNo">7402</span> * @return Resulting Cells after <code>deltas</code> have been applied to current<a name="line.7402"></a> -<span class="sourceLineNo">7403</span> * values. Side effect is our filling out of the <code>results</code> List.<a name="line.7403"></a> -<span class="sourceLineNo">7404</span> */<a name="line.7404"></a> -<span class="sourceLineNo">7405</span> private List<Cell> reckonDeltasByStore(final Store store, final Operation op,<a name="line.7405"></a> -<span class="sourceLineNo">7406</span> final Mutation mutation, final Durability effectiveDurability, final long now,<a name="line.7406"></a> -<span class="sourceLineNo">7407</span> final List<Cell> deltas, final List<Cell> results)<a name="line.7407"></a> -<span class="sourceLineNo">7408</span> throws IOException {<a name="line.7408"></a> -<span class="sourceLineNo">7409</span> byte [] columnFamily = store.getFamily().getName();<a name="line.7409"></a> -<span class="sourceLineNo">7410</span> List<Cell> toApply = new ArrayList<>(deltas.size());<a name="line.7410"></a> -<span class="sourceLineNo">7411</span> // Get previous values for all columns in this family.<a name="line.7411"></a> -<span class="sourceLineNo">7412</span> List<Cell> currentValues = get(mutation, store, deltas,<a name="line.7412"></a> -<span class="sourceLineNo">7413</span> null/*Default IsolationLevel*/,<a name="line.7413"></a> -<span class="sourceLineNo">7414</span> op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null);<a name="line.7414"></a> -<span class="sourceLineNo">7415</span> // Iterate the input columns and update existing values if they were found, otherwise<a name="line.7415"></a> -<span class="sourceLineNo">7416</span> // add new column initialized to the delta amount<a name="line.7416"></a> -<span class="sourceLineNo">7417</span> int currentValuesIndex = 0;<a name="line.7417"></a> -<span class="sourceLineNo">7418</span> for (int i = 0; i < deltas.size(); i++) {<a name="line.7418"></a> -<span class="sourceLineNo">7419</span> Cell delta = deltas.get(i);<a name="line.7419"></a> -<span class="sourceLineNo">7420</span> Cell currentValue = null;<a name="line.7420"></a> -<span class="sourceLineNo">7421</span> boolean firstWrite = false;<a name="line.7421"></a> -<span class="sourceLineNo">7422</span> if (currentValuesIndex < currentValues.size() &&<a name="line.7422"></a> -<span class="sourceLineNo">7423</span> CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {<a name="line.7423"></a> -<span class="sourceLineNo">7424</span> currentValue = currentValues.get(currentValuesIndex);<a name="line.7424"></a> -<span class="sourceLineNo">7425</span> if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {<a name="line.7425"></a> -<span class="sourceLineNo">7426</span> currentValuesIndex++;<a name="line.7426"></a> -<span class="sourceLineNo">7427</span> }<a name="line.7427"></a> -<span class="sourceLineNo">7428</span> } else {<a name="line.7428"></a> -<span class="sourceLineNo">7429</span> firstWrite = true;<a name="line.7429"></a> -<span class="sourceLineNo">7430</span> }<a name="line.7430"></a> -<span class="sourceLineNo">7431</span> // Switch on whether this an increment or an append building the new Cell to apply.<a name="line.7431"></a> -<span class="sourceLineNo">7432</span> Cell newCell = null;<a name="line.7432"></a> -<span class="sourceLineNo">7433</span> MutationType mutationType = null;<a name="line.7433"></a> -<span class="sourceLineNo">7434</span> boolean apply = true;<a name="line.7434"></a> -<span class="sourceLineNo">7435</span> switch (op) {<a name="line.7435"></a> -<span class="sourceLineNo">7436</span> case INCREMENT:<a name="line.7436"></a> -<span class="sourceLineNo">7437</span> mutationType = MutationType.INCREMENT;<a name="line.7437"></a> -<span class="sourceLineNo">7438</span> // If delta amount to apply is 0, don't write WAL or MemStore.<a name="line.7438"></a> -<span class="sourceLineNo">7439</span> long deltaAmount = getLongValue(delta);<a name="line.7439"></a> -<span class="sourceLineNo">7440</span> apply = deltaAmount != 0;<a name="line.7440"></a> -<span class="sourceLineNo">7441</span> newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now,<a name="line.7441"></a> -<span class="sourceLineNo">7442</span> (Increment)mutation);<a name="line.7442"></a> -<span class="sourceLineNo">7443</span> break;<a name="line.7443"></a> -<span class="sourceLineNo">7444</span> case APPEND:<a name="line.7444"></a> -<span class="sourceLineNo">7445</span> mutationType = MutationType.APPEND;<a name="line.7445"></a> -<span class="sourceLineNo">7446</span> // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to.<a name="line.7446"></a> -<span class="sourceLineNo">7447</span> newCell = reckonAppend(delta, currentValue, now, (Append)mutation);<a name="line.7447"></a> -<span class="sourceLineNo">7448</span> break;<a name="line.7448"></a> -<span class="sourceLineNo">7449</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.7449"></a> -<span class="sourceLineNo">7450</span> }<a name="line.7450"></a> -<span class="sourceLineNo">7451</span><a name="line.7451"></a> -<span class="sourceLineNo">7452</span> // Give coprocessors a chance to update the new cell<a name="line.7452"></a> -<span class="sourceLineNo">7453</span> if (coprocessorHost != null) {<a name="line.7453"></a> -<span class="sourceLineNo">7454</span> newCell =<a name="line.7454"></a> -<span class="sourceLineNo">7455</span> coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);<a name="line.7455"></a> -<span class="sourceLineNo">7456</span> }<a name="line.7456"></a> -<span class="sourceLineNo">7457</span> // If apply, we need to update memstore/WAL with new value; add it toApply.<a name="line.7457"></a> -<span class="sourceLineNo">7458</span> if (apply || firstWrite) {<a name="line.7458"></a> -<span class="sourceLineNo">7459</span> toApply.add(newCell);<a name="line.7459"></a> -<span class="sourceLineNo">7460</span> }<a name="line.7460"></a> -<span class="sourceLineNo">7461</span> // Add to results to get returned to the Client. If null, cilent does not want results.<a name="line.7461"></a> -<span class="sourceLineNo">7462</span> if (results != null) {<a name="line.7462"></a> -<span class="sourceLineNo">7463</span> results.add(newCell);<a name="line.7463"></a> -<span class="sourceLineNo">7464</span> }<a name="line.7464"></a> -<span class="sourceLineNo">7465</span> }<a name="line.7465"></a> -<span class="sourceLineNo">7466</span> return toApply;<a name="line.7466"></a> -<span class="sourceLineNo">7467</span> }<a name="line.7467"></a> -<span class="sourceLineNo">7468</span><a name="line.7468"></a> -<span class="sourceLineNo">7469</span> /**<a name="line.7469"></a> -<span class="sourceLineNo">7470</span> * Calculate new Increment Cell.<a name="line.7470"></a> -<span class="sourceLineNo">7471</span> * @return New Increment Cell with delta applied to currentValue if currentValue is not null;<a name="line.7471"></a> -<span class="sourceLineNo">7472</span> * otherwise, a new Cell with the delta set as its value.<a name="line.7472"></a> -<span class="sourceLineNo">7473</span> */<a name="line.7473"></a> -<span class="sourceLineNo">7474</span> private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue,<a name="line.7474"></a> -<span class="sourceLineNo">7475</span> byte [] columnFamily, final long now, Mutation mutation)<a name="line.7475"></a> -<span class="sourceLineNo">7476</span> throws IOException {<a name="line.7476"></a> -<span class="sourceLineNo">7477</span> // Forward any tags found on the delta.<a name="line.7477"></a> -<span class="sourceLineNo">7478</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.7478"></a> -<span class="sourceLineNo">7479</span> long newValue = deltaAmount;<a name="line.7479"></a> -<span class="sourceLineNo">7480</span> long ts = now;<a name="line.7480"></a> -<span class="sourceLineNo">7481</span> if (currentValue != null) {<a name="line.7481"></a> -<span class="sourceLineNo">7482</span> tags = TagUtil.carryForwardTags(tags, currentValue);<a name="line.7482"></a> -<span class="sourceLineNo">7483</span> ts = Math.max(now, currentValue.getTimestamp() + 1);<a name="line.7483"></a> -<span class="sourceLineNo">7484</span> newValue += getLongValue(currentValue);<a name="line.7484"></a> -<span class="sourceLineNo">7485</span> }<a name="line.7485"></a> -<span class="sourceLineNo">7486</span> // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made...<a name="line.7486"></a> -<span class="sourceLineNo">7487</span> // doesn't work well with offheaping or if we are doing a different Cell type.<a name="line.7487"></a> -<span class="sourceLineNo">7488</span> byte [] incrementAmountInBytes = Bytes.toBytes(newValue);<a name="line.7488"></a> -<span class="sourceLineNo">7489</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7489"></a> -<span class="sourceLineNo">7490</span> byte [] row = mutation.getRow();<a name="line.7490"></a> -<span class="sourceLineNo">7491</span> return new KeyValue(row, 0, row.length,<a name="line.7491"></a> -<span class="sourceLineNo">7492</span> columnFamily, 0, columnFamily.length,<a name="line.7492"></a> -<span class="sourceLineNo">7493</span> delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(),<a name="line.7493"></a> -<span class="sourceLineNo">7494</span> ts, KeyValue.Type.Put,<a name="line.7494"></a> -<span class="sourceLineNo">7495</span> incrementAmountInBytes, 0, incrementAmountInBytes.length,<a name="line.7495"></a> -<span class="sourceLineNo">7496</span> tags);<a name="line.7496"></a> -<span class="sourceLineNo">7497</span> }<a name="line.7497"></a> -<span class="sourceLineNo">7498</span><a name="line.7498"></a> -<span class="sourceLineNo">7499</span> private Cell reckonAppend(final Cell delta, final Cell currentValue, final long now,<a name="line.7499"></a> -<span class="sourceLineNo">7500</span> Append mutation)<a name="line.7500"></a> -<span class="sourceLineNo">7501</span> throws IOException {<a name="line.7501"></a> -<span class="sourceLineNo">7502</span> // Forward any tags found on the delta.<a name="line.7502"></a> -<span class="sourceLineNo">7503</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.7503"></a> -<span class="sourceLineNo">7504</span> long ts = now;<a name="line.7504"></a> -<span class="sourceLineNo">7505</span> Cell newCell = null;<a name="line.7505"></a> -<span class="sourceLineNo">7506</span> byte [] row = mutation.getRow();<a name="line.7506"></a> -<span class="sourceLineNo">7507</span> if (currentValue != null) {<a name="line.7507"></a> -<span class="sourceLineNo">7508</span> tags = TagUtil.carryForwardTags(tags, currentValue);<a name="line.7508"></a> -<span class="sourceLineNo">7509</span> ts = Math.max(now, currentValue.getTimestamp() + 1);<a name="line.7509"></a> -<span class="sourceLineNo">7510</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7510"></a> -<span class="sourceLineNo">7511</span> byte[] tagBytes = TagUtil.fromList(tags);<a name="line.7511"></a> -<span class="sourceLineNo">7512</span> // Allocate an empty cell and copy in all parts.<a name="line.7512"></a> -<span class="sourceLineNo">7513</span> // TODO: This is intimate knowledge of how a KeyValue is made. Undo!!! Prevents our doing<a name="line.7513"></a> -<span class="sourceLineNo">7514</span> // other Cell types. Copying on-heap too if an off-heap Cell.<a name="line.7514"></a> -<span class="sourceLineNo">7515</span> newCell = new KeyValue(row.length, delta.getFamilyLength(),<a name="line.7515"></a> -<span class="sourceLineNo">7516</span> delta.getQualifierLength(), ts, KeyValue.Type.Put,<a name="line.7516"></a> -<span class="sourceLineNo">7517</span> delta.getValueLength() + currentValue.getValueLength(),<a name="line.7517"></a> -<span class="sourceLineNo">7518</span> tagBytes == null? 0: tagBytes.length);<a name="line.7518"></a> -<span class="sourceLineNo">7519</span> // Copy in row, family, and qualifier<a name="line.7519"></a> -<span class="sourceLineNo">7520</span> System.arraycopy(row, 0, newCell.getRowArray(), newCell.getRowOffset(), row.length);<a name="line.7520"></a> -<span class="sourceLineNo">7521</span> System.arraycopy(delta.getFamilyArray(), delta.getFamilyOffset(),<a name="line.7521"></a> -<span class="sourceLineNo">7522</span> newCell.getFamilyArray(), newCell.getFamilyOffset(), delta.getFamilyLength());<a name="line.7522"></a> -<span class="sourceLineNo">7523</span> System.arraycopy(delta.getQualifierArray(), delta.getQualifierOffset(),<a name="line.7523"></a> -<span class="sourceLineNo">7524</span> newCell.getQualifierArray(), newCell.getQualifierOffset(), delta.getQualifierLength());<a name="line.7524"></a> -<span class="sourceLineNo">7525</span> // Copy in the value<a name="line.7525"></a> -<span class="sourceLineNo">7526</span> CellUtil.copyValueTo(currentValue, newCell.getValueArray(), newCell.getValueOffset());<a name="line.7526"></a> -<span class="sourceLineNo">7527</span> System.arraycopy(delta.getValueArray(), delta.getValueOffset(),<a name="line.7527"></a> -<span class="sourceLineNo">7528</span> newCell.getValueArray(), newCell.getValueOffset() + currentValue.getValueLength(),<a name="line.7528"></a> -<span class="sourceLineNo">7529</span> delta.getValueLength());<a name="line.7529"></a> -<span class="sourceLineNo">7530</span> // Copy in tag data<a name="line.7530"></a> -<span class="sourceLineNo">7531</span> if (tagBytes != null) {<a name="line.7531"></a> -<span class="sourceLineNo">7532</span> System.arraycopy(tagBytes, 0,<a name="line.7532"></a> -<span class="sourceLineNo">7533</span> newCell.getTagsArray(), newCell.getTagsOffset(), tagBytes.length);<a name="line.7533"></a> -<span class="sourceLineNo">7534</span> }<a name="line.7534"></a> -<span class="sourceLineNo">7535</span> } else {<a name="line.7535"></a> -<span class="sourceLineNo">7536</span> // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP<a name="line.7536"></a> -<span class="sourceLineNo">7537</span> CellUtil.updateLatestStamp(delta, now);<a name="line.7537"></a> -<span class="sourceLineNo">7538</span> newCell = delta;<a name="line.7538"></a> -<span class="sourceLineNo">7539</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7539"></a> -<span class="sourceLineNo">7540</span> if (tags != null) {<a name="line.7540"></a> -<span class="sourceLineNo">7541</span> newCell = CellUtil.createCell(delta, tags);<a name="line.7541"></a> -<span class="sourceLineNo">7542</span> }<a name="line.7542"></a> -<span class="sourceLineNo">7543</span> }<a name="line.7543"></a> -<span class="sourceLineNo">7544</span> return newCell;<a name="line.7544"></a> -<span class="sourceLineNo">7545</span> }<a name="line.7545"></a> -<span class="sourceLineNo">7546</span><a name="line.7546"></a> -<span class="sourceLineNo">7547</span> /**<a name="line.7547"></a> -<span class="sourceLineNo">7548</span> * @return Get the long out of the passed in Cell<a name="line.7548"></a> -<span class="sourceLineNo">7549</span> */<a name="line.7549"></a> -<span class="sourceLineNo">7550</span> private static long getLongValue(final Cell cell) throws DoNotRetryIOException {<a name="line.7550"></a> -<span class="sourceLineNo">7551</span> int len = cell.getValueLength();<a name="line.7551"></a> -<span class="sourceLineNo">7552</span> if (len != Bytes.SIZEOF_LONG) {<a name="line.7552"></a> -<span class="sourceLineNo">7553</span> // throw DoNotRetryIOException instead of IllegalArgumentException<a name="line.7553"></a> -<span class="sourceLineNo">7554</span> throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");<a name="line.7554"></a> -<span class="sourceLineNo">7555</span> }<a name="line.7555"></a> -<span class="sourceLineNo">7556</span> return CellUtil.getValueAsLong(cell);<a name="line.7556"></a> -<span class="sourceLineNo">7557</span> }<a name="line.7557"></a> -<span class="sourceLineNo">7558</span><a name="line.7558"></a> -<span class="sourceLineNo">7559</span> /**<a name="line.7559"></a> -<span class="sourceLineNo">7560</span> * Do a specific Get on passed <code>columnFamily</code> and column qualifiers.<a name="line.7560"></a> -<span class="sourceLineNo">7561</span> * @param mutation Mutation we are doing this Get for.<a name="line.7561"></a> -<span class="sourceLineNo">7562</span> * @param store Which column family on row (TODO: Go all Gets in one go)<a name="line.7562"></a> -<span class="sourceLineNo">7563</span> * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.<a name="line.7563"></a> -<span class="sourceLineNo">7564</span> * @return Return list of Cells found.<a name="line.7564"></a> -<span class="sourceLineNo">7565</span> */<a name="line.7565"></a> -<span class="sourceLineNo">7566</span> private List<Cell> get(final Mutation mutation, final Store store,<a name="line.7566"></a> -<span class="sourceLineNo">7567</span> final List<Cell> coordinates, final IsolationLevel isolation, final TimeRange tr)<a name="line.7567"></a> -<span class="sourceLineNo">7568</span> throws IOException {<a name="line.7568"></a> -<span class="sourceLineNo">7569</span> // Sort the cells so that they match the order that they appear in the Get results. Otherwise,<a name="line.7569"></a> -<span class="sourceLineNo">7570</span> // we won't be able to find the existing values if the cells are not specified in order by the<a name="line.7570"></a> -<span class="sourceLineNo">7571</span> // client since cells are in an array list.<a name="line.7571"></a> -<span class="sourceLineNo">7572</span> // TODO: I don't get why we are sorting. St.Ack 20150107<a name="line.7572"></a> -<span class="sourceLineNo">7573</span> sort(coordinates, store.getComparator());<a name="line.7573"></a> -<span class="sourceLineNo">7574</span> Get get = new Get(mutation.getRow());<a name="line.7574"></a> -<span class="sourceLineNo">7575</span> if (isolation != null) {<a name="line.7575"></a> -<span class="sourceLineNo">7576</span> get.setIsolationLevel(isolation);<a name="line.7576"></a> -<span class="sourceLineNo">7577</span> }<a name="line.7577"></a> -<span class="sourceLineNo">7578</span> for (Cell cell: coordinates) {<a name="line.7578"></a> -<span class="sourceLineNo">7579</span> get.addColumn(store.getFamily().getName(), CellUtil.cloneQualifier(cell));<a name="line.7579"></a> -<span class="sourceLineNo">7580</span> }<a name="line.7580"></a> -<span class="sourceLineNo">7581</span> // Increments carry time range. If an Increment instance, put it on the Get.<a name="line.7581"></a> -<span class="sourceLineNo">7582</span> if (tr != null) {<a name="line.7582"></a> -<span class="sourceLineNo">7583</span> get.setTimeRange(tr.getMin(), tr.getMax());<a name="line.7583"></a> -<span class="sourceLineNo">7584</span> }<a name="line.7584"></a> -<span class="sourceLineNo">7585</span> return get(get, false);<a name="line.7585"></a> -<span class="sourceLineNo">7586</span> }<a name="line.7586"></a> -<span class="sourceLineNo">7587</span><a name="line.7587"></a> -<span class="sourceLineNo">7588</span> /**<a name="line.7588"></a> -<span class="sourceLineNo">7589</span> * @return Sorted list of <code>cells</code> using <code>comparator</code><a name="line.7589"></a> -<span class="sourceLineNo">7590</span> */<a name="line.7590"></a> -<span class="sourceLineNo">7591</span> private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {<a name="line.7591"></a> -<span class="sourceLineNo">7592</span> Collections.sort(cells, comparator);<a name="line.7592"></a> -<span class="sourceLineNo">7593</span> return cells;<a name="line.7593"></a> -<span class="sourceLineNo">7594</span> }<a name="line.7594"></a> -<span class="sourceLineNo">7595</span><a name="line.7595"></a> -<span class="sourceLineNo">7596</span> //<a name="line.7596"></a> -<span class="sourceLineNo">7597</span> // New HBASE-880 Helpers<a name="line.7597"></a> -<span class="sourceLineNo">7598</span> //<a name="line.7598"></a> -<span class="sourceLineNo">7599</span><a name="line.7599"></a> -<span class="sourceLineNo">7600</span> void checkFamily(final byte [] family)<a name="line.7600"></a> -<span class="sourceLineNo">7601</span> throws NoSuchColumnFamilyException {<a name="line.7601"></a> -<span class="sourceLineNo">7602</span> if (!this.htableDescriptor.hasFamily(family)) {<a name="line.7602"></a> -<span class="sourceLineNo">7603</span> throw new NoSuchColumnFamilyException("Column family " +<a name="line.7603"></a> -<span class="sourceLineNo">7604</span> Bytes.toString(family) + " does not exist in region " + this<a name="line.7604"></a> -<span class="sourceLineNo">7605</span> + " in table " + this.htableDescriptor);<a name="line.7605"></a> -<span class="sourceLineNo">7606</span> }<a name="line.7606"></a> -<span class="sourceLineNo">7607</span> }<a name="line.7607"></a> -<span class="sourceLineNo">7608</span><a name="line.7608"></a> -<span class="sourceLineNo">7609</span> public static final long FIXED_OVERHEAD = ClassSize.align(<a name="line.7609"></a> -<span class="sourceLineNo">7610</span> ClassSize.OBJECT +<a name="line.7610"></a> -<span class="sourceLineNo">7611</span> ClassSize.ARRAY +<a name="line.7611"></a> -<span class="sourceLineNo">7612</span> 49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +<a name="line.7612"></a> -<span class="sourceLineNo">7613</span> (14 * Bytes.SIZEOF_LONG) +<a name="line.7613"></a> -<span class="sourceLineNo">7614</span> 6 * Bytes.SIZEOF_BOOLEAN);<a name="line.7614"></a> -<span class="sourceLineNo">7615</span><a name="line.7615"></a> -<span class="sourceLineNo">7616</span> // woefully out of date - currently missing:<a name="line.7616"></a> -<span class="sourceLineNo">7617</span> // 1 x HashMap - coprocessorServiceHandlers<a name="line.7617"></a> -<span class="sourceLineNo">7618</span> // 6 x LongAdder - numMutationsWithoutWAL, dataInMemoryWithoutWAL,<a name="line.7618"></a> -<span class="sourceLineNo">7619</span> // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,<a name="line.7619"></a> -<span class="sourceLineNo">7620</span> // writeRequestsCount<a name="line.7620"></a> -<span class="sourceLineNo">7621</span> // 1 x HRegion$WriteState - writestate<a name="line.7621"></a> -<span class="sourceLineNo">7622</span> // 1 x RegionCoprocessorHost - coprocessorHost<a name="line.7622"></a> -<span class="sourceLineNo">7623</span> // 1 x RegionSplitPolicy - splitPolicy<a name="line.7623"></a> -<span class="sourceLineNo">7624</span> // 1 x MetricsRegion - metricsRegion<a name="line.7624"></a> -<span class="sourceLineNo">7625</span> // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper<a name="line.7625"></a> -<span class="sourceLineNo">7626</span> public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +<a name="line.7626"></a> -<span class="sourceLineNo">7627</span> ClassSize.OBJECT + // closeLock<a name="line.7627"></a> -<span class="sourceLineNo">7628</span> (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing<a name="line.7628"></a> -<span class="sourceLineNo">7629</span> (4 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL,<a name="line.7629"></a> -<span class="sourceLineNo">7630</span> // compactionsFailed<a name="line.7630"></a> -<span class="sourceLineNo">7631</span> (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints<a name="line.7631"></a> -<span class="sourceLineNo">7632</span> WriteState.HEAP_SIZE + // writestate<a name="line.7632"></a> -<span class="sourceLineNo">7633</span> ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores<a name="line.7633"></a> -<span class="sourceLineNo">7634</span> (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock<a name="line.7634"></a> -<span class="sourceLineNo">7635</span> MultiVersionConcurrencyControl.FIXED_SIZE // mvcc<a name="line.7635"></a> -<span class="sourceLineNo">7636</span> + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes<a name="line.7636"></a> -<span class="sourceLineNo">7637</span> + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress<a name="line.7637"></a> -<span class="sourceLineNo">7638</span> + ClassSize.STORE_SERVICES // store services<a name="line.7638"></a> -<span class="sourceLineNo">7639</span> ;<a name="line.7639"></a> -<span class="sourceLineNo">7640</span><a name="line.7640"></a> -<span class="sourceLineNo">7641</span> @Override<a name="line.7641"></a> -<span class="sourceLineNo">7642</span> public long heapSize() {<a name="line.7642"></a> -<span class="sourceLineNo">7643</span> long heapSize = DEEP_OVERHEAD;<a name="line.7643"></a> -<span class="sourceLineNo">7644</span> for (Store store : this.stores.values()) {<a name="line.7644"></a> -<span class="sourceLineNo">7645</span> heapSize += store.heapSize();<a name="line.7645"></a> -<span class="sourceLineNo">7646</span> }<a name="line.7646"></a> -<span class="sourceLineNo">7647</span> // this does not take into account row locks, recent flushes, mvcc entries, and more<a name="line.7647"></a> -<span class="sourceLineNo">7648</span> return heapSize;<a name="line.7648"></a> -<span class="sourceLineNo">7649</span> }<a name="line.7649"></a> -<span class="sourceLineNo">7650</span><a name="line.7650"></a> -<span class="sourceLineNo">7651</span> @Override<a name="line.7651"></a> -<span class="sourceLineNo">7652</span> public boolean registerService(com.google.protobuf.Service instance) {<a name="line.7652"></a> -<span class="sourceLineNo">7653</span> /*<a name="line.7653"></a> -<span class="sourceLineNo">7654</span> * No stacking of instances is allowed for a single service name<a name="line.7654"></a> -<span class="sourceLineNo">7655</span> */<a name="line.7655"></a> -<span class="sourceLineNo">7656</span> com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();<a name="line.7656"></a> -<span class="sourceLineNo">7657</span> String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);<a name="line.7657"></a> -<span class="sourceLineNo">7658</span> if (coprocessorServiceHandlers.containsKey(serviceName)) {<a name="line.7658"></a> -<span class="sourceLineNo">7659</span> LOG.error("Coprocessor service " + serviceName +<a name="line.7659"></a> -<span class="sourceLineNo">7660</span> " already registered, rejecting request from " + instance<a name="line.7660"></a> -<span class="sourceLineNo">7661</span> );<a name="line.7661"></a> -<span class="sourceLineNo">7662</span> return false;<a name="line.7662"></a> -<span class="sourceLineNo">7663</span> }<a name="line.7663"></a> -<span class="sourceLineNo">7664</span><a name="line.7664"></a> -<span class="sourceLineNo">7665</span> coprocessorServiceHandlers.put(serviceName, instance);<a name="line.7665"></a> -<span class="sourceLineNo">7666</span> if (LOG.isDebugEnabled()) {<a name="line.7666"></a> -<span class="sourceLineNo">7667</span> LOG.debug("Registered coprocessor service: region=" +<a name="line.7667"></a> -<span class="sourceLineNo">7668</span> Bytes.toStringBinary(getRegionInfo().getRegionName()) +<a name="line.7668"></a> -<span class="sourceLineNo">7669</span> " service=" + serviceName);<a name="line.7669"></a> -<span class="sourceLineNo">7670</span> }<a name="line.7670"></a> -<span class="sourceLineNo">7671</span> return true;<a name="line.7671"></a> -<span class="sourceLineNo">7672</span> }<a name="line.7672"></a> -<span class="sourceLineNo">7673</span><a name="line.7673"></a> -<span class="sourceLineNo">7674</span> @Override<a name="line.7674"></a> -<span class="sourceLineNo">7675</span> public com.google.protobuf.Message execService(com.google.protobuf.RpcController controller,<a name="line.7675"></a> -<span class="sourceLineNo">7676</span> CoprocessorServiceCall call)<a name="line.7676"></a> -<span class="sourceLineNo">7677</span> throws IOException {<a name="line.7677"></a> -<span class="sourceLineNo">7678</span> String serviceName = call.getServiceName();<a name="line.7678"></a> -<span class="sourceLineNo">7679</span> com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);<a name="line.7679"></a> -<span class="sourceLineNo">7680</span> if (service == null) {<a name="line.7680"></a> -<span class="sourceLineNo">7681</span> throw new UnknownProtocolException(null, "No registered coprocessor service found for " +<a name="line.7681"></a> -<span class="sourceLineNo">7682</span> serviceName + " in region " + Bytes.toStringBinary(getRegionInfo().getRegionName()));<a name="line.7682"></a> -<span class="sourceLineNo">7683</span> }<a name="line.7683"></a> -<span class="sourceLineNo">7684</span> com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();<a name="line.7684"></a> -<span class="sourceLineNo">7685</span><a name="line.7685"></a> -<span class="sourceLineNo">7686</span> String methodName = call.getMethodName();<a name="line.7686"></a> -<span class="sourceLineNo">7687</span> com.google.protobuf.Descriptors.MethodDescriptor methodDesc =<a name="line.7687"></a> -<span class="sourceLineNo">7688</span> CoprocessorRpcUtils.getMethodDescriptor(methodName, serviceDesc);<a name="line.7688"></a> -<span class="sourceLineNo">7689</span><a name="line.7689"></a> -<span class="sourceLineNo">7690</span> com.google.protobuf.Message.Builder builder =<a name="line.7690"></a> -<span class="sourceLineNo">7691</span> service.getRequestPrototype(methodDesc).newBuilderForType();<a name="line.7691"></a> -<span class="sourceLineNo">7692</span><a name="line.7692"></a> -<span class="sourceLineNo">7693</span> org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,<a name="line.7693"></a> -<span class="sourceLineNo">7694</span> call.getRequest().toByteArray());<a name="line.7694"></a> -<span class="sourceLineNo">7695</span> com.google.protobuf.Message request =<a name="line.7695"></a> -<span class="sourceLineNo">7696</span> CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());<a name="line.7696"></a> -<span class="sourceLineNo">7697</span><a name="line.7697"></a> -<span class="sourceLineNo">7698</span> if (coprocessorHost != null) {<a name="line.7698"></a> -<span class="sourceLineNo">7699</span> request = coprocessorHost.preEndpointInvocation(service, methodName, request);<a name="line.7699"></a> -<span class="sourceLineNo">7700</span> }<a name="line.7700"></a> -<span class="sourceLineNo">7701</span><a name="line.7701"></a> -<span class="sourceLineNo">7702</span> final com.google.protobuf.Message.Builder responseBuilder =<a name="line.7702"></a> -<span class="sourceLineNo">7703</span> service.getResponsePrototype(methodDesc).newBuilderForType();<a name="line.7703"></a> -<span class="sourceLineNo">7704</span> service.callMethod(methodDesc, controller, request,<a name="line.7704"></a> -<span class="sourceLineNo">7705</span> new com.google.protobuf.RpcCallback<com.google.protobuf.Message>() {<a name="line.7705"></a> -<span class="sourceLineNo">7706</span> @Override<a name="line.7706"></a> -<span class="sourceLineNo">7707</span> public void run(com.google.protobuf.Message message) {<a name="line.7707"></a> -<span class="sourceLineNo">7708</span> if (message != null) {<a name="line.7708"></a> -<span class="sourceLineNo">7709</span> responseBuilder.mergeFrom(message);<a name="line.7709"></a> -<span class="sourceLineNo">7710</span> }<a name="line.7710"></a> -<span class="sourceLineNo">7711</span> }<a name="line.7711"></a> -<span class="sourceLineNo">7712</span> });<a name="line.7712"></a> -<span class="sourceLineNo">7713</span><a name="line.7713"></a> -<span class="sourceLineNo">7714</span> if (coprocessorHost != null) {<a name="line.7714"></a> -<span class="sourceLineNo">7715</span> coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);<a name="line.7715"></a> -<span class="sourceLineNo">7716</span> }<a name="line.7716"></a> -<span class="sourceLineNo">7717</span> IOException exception =<a name="line.7717"></a> -<span class="sourceLineNo">7718</span> org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.getControllerException(controller);<a name="line.7718"></a> -<span class="sourceLineNo">7719</span> if (exception != null) {<a name="line.7719"></a> -<span class="sourceLineNo">7720</span> throw exception;<a name="line.7720"></a> -<span class="sourceLineNo">7721</span> }<a name="line.7721"></a> -<span class="sourceLineNo">7722</span><a name="line.7722"></a> -<span class="sourceLineNo">7723</span> return responseBuilder.build();<a name="line.7723"></a> -<span class="sourceLineNo">7724</span> }<a name="line.7724"></a> -<span class="sourceLineNo">7725</span><a name="line.7725"></a> -<span class="sourceLineNo">7726</span> boolean shouldForceSplit() {<a name="line.7726"></a> -<span class="sourceLineNo">7727</span> return this.splitRequest;<a name="line.7727"></a> -<span class="sourceLineNo">7728</span> }<a name="line.7728"></a> -<span class="sourceLineNo">7729</span><a name="line.7729"></a> -<span class="sourceLineNo">7730</span> byte[] getExplicitSplitPoint() {<a name="line.7730"></a> -<span class="sourceLineNo">7731</span> return this.explicitSplitPoint;<a name="line.7731"></a> -<span class="sourceLineNo">7732</span> }<a name="line.7732"></a> -<span class="sourceLineNo">7733</span><a name="line.7733"></a> -<span class="sourceLineNo">7734</span> void forceSplit(byte[] sp) {<a name="line.7734"></a> -<span class="sourceLineNo">7735</span> // This HRegion will go away after the forced split is successful<a name="line.7735"></a> -<span class="sourceLineNo">7736</span> // But if a forced split fails, we need to clear forced split.<a name="line.7736"></a> -<span class="sourceLineNo">7737</span> this.splitRequest = true;<a name="line.7737"></a> -<span class="sourceLineNo">7738</span> if (sp != null) {<a name="line.7738"></a> -<span class="sourceLineNo">7739</span> this.explicitSplitPoint = sp;<a name="line.7739"></a> -<span class="sourceLineNo">7740</span> }<a name="line.7740"></a> -<span class="sourceLineNo">7741</span> }<a name="line.7741"></a> -<span class="sourceLineNo">7742</span><a name="line.7742"></a> -<span class="sourceLineNo">7743</span> void clearSplit() {<a name="line.7743"></a> -<span class="sourceLineNo">7744</span> this.splitRequest = false;<a name="line.7744"></a> -<span class="sourceLineNo">7745</span> this.explicitSplitPoint = null;<a name="line.7745"></a> -<span class="sourceLineNo">7746</span> }<a name="line.7746"></a> -<span class="sourceLineNo">7747</span><a name="line.7747"></a> -<span class="sourceLineNo">7748</span> /**<a name="line.7748"></a> -<span class="sourceLineNo">7749</span> * Give the region a chance to prepare before it is split.<a name="line.7749"></a> -<span class="sourceLineNo">7750</span> */<a name="line.7750"></a> -<span class="sourceLineNo">7751</span> protected void prepareToSplit() {<a name="line.7751"></a> -<span class="sourceLineNo">7752</span> // nothing<a name="line.7752"></a> -<span class="sourceLineNo">7753</span> }<a name="line.7753"></a> -<span class="sourceLineNo">7754</span><a name="line.7754"></a> -<span class="sourceLineNo">7755</span> /**<a name="line.7755"></a> -<span class="sourceLineNo">7756</span> * Return the splitpoint. null indicates the region isn't splittable<a name="line.7756"></a> -<span class="sourceLineNo">7757</span> * If the splitpoint isn't explicitly specified, it will go over the stores<a name="line.7757"></a> -<span class="sourceLineNo">7758</span> * to find the best splitpoint. Currently the criteria of best splitpoint<a name="line.7758"></a> -<span class="sourceLineNo">7759</span> * is based on the size of the store.<a name="line.7759"></a> -<span class="sourceLineNo">7760</span> */<a name="line.7760"></a> -<span class="sourceLineNo">7761</span> public byte[] checkSplit() {<a name="line.7761"></a> -<span class="sourceLineNo">7762</span> // Can't split META<a name="line.7762"></a> -<span class="sourceLineNo">7763</span> if (this.getRegionInfo().isMetaTable() ||<a name="line.7763"></a> -<span class="sourceLineNo">7764</span> TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {<a name="line.7764"></a> -<span class="sourceLineNo">7765</span> if (shouldForceSplit()) {<a name="line.7765"></a> -<span class="sourceLineNo">7766</span> LOG.warn("Cannot split meta region in HBase 0.20 and above");<a name="line.7766"></a> -<span class="sourceLineNo">7767</span> }<a name="line.7767"></a> -<span class="sourceLineNo">7768</span> return null;<a name="line.7768"></a> -<span class="sourceLineNo">7769</span> }<a name="line.7769"></a> -<span class="sourceLineNo">7770</span><a name="line.7770"></a> -<span class="sourceLineNo">7771</span> // Can't split region which is in recovering state<a name="line.7771"></a> -<span class="sourceLineNo">7772</span> if (this.isRecovering()) {<a name="line.7772"></a> -<span class="sourceLineNo">7773</span> LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");<a name="line.7773"></a> -<span class="sourceLineNo">7774</span> return null;<a name="line.7774"></a> -<span class="sourceLineNo">7775</span> }<a name="line.7775"></a> -<span class="sourceLineNo">7776</span><a name="line.7776"></a> -<span class="sourceLineNo">7777</span> if (!splitPolicy.shouldSplit()) {<a name="line.7777"></a> -<span class="sourceLineNo">7778</span> return null;<a name="line.7778"></a> -<span class="sourceLineNo">7779</span> }<a name="line.7779"></a> -<span class="sourceLineNo">7780</span><a name="line.7780"></a> -<span class="sourceLineNo">7781</span> byte[] ret = splitPolicy.getSplitPoint();<a name="line.7781"></a> -<span class="sourceLineNo">7782</span><a name="line.7782"></a> -<span class="sourceLineNo">7783</span> if (ret != null) {<a name="line.7783"></a> -<span class="sourceLineNo">7784</span> try {<a name="line.7784"></a> -<span class="sourceLineNo">7785</span> checkRow(ret, "calculated split");<a name="line.7785"></a> -<span class="sourceLineNo">7786</span> } catch (IOException e) {<a name="line.7786"></a> -<span class="sourceLineNo">7787</span> LOG.error("Ignoring invalid split", e);<a name="line.7787"></a> -<span class="sourceLineNo">7788</span> return null;<a name="line.7788"></a> -<span class="sourceLineNo">7789</span> }<a name="line.7789"></a> -<span class="sourceLineNo">7790</span> }<a name="line.7790"></a> -<span class="sourceLineNo">7791</span> return ret;<a name="line.7791"></a> -<span class="sourceLineNo">7792</span> }<a name="line.7792"></a> -<span class="sourceLineNo">7793</span><a name="line.7793"></a> -<span class="sourceLineNo">7794</span> /**<a name="line.7794"></a> -<span class="sourceLineNo">7795</span> * @return The priority that this region should have in the compaction queue<a name="line.7795"></a> -<span class="sourceLineNo">7796</span> */<a name="line.7796"></a> -<span class="sourceLineNo">7797</span> public int getCompactPriority() {<a name="line.7797"></a> -<span class="sourceLineNo">7798</span> int count = Integer.MAX_VALUE;<a name="line.7798"></a> -<span class="sourceLineNo">7799</span> for (Store store : stores.values()) {<a name="line.7799"></a> -<span class="sourceLineNo">7800</span> count = Math.min(count, store.getCompactPriority());<a name="line.7800"></a> -<span class="sourceLineNo">7801</span> }<a name="line.7801"></a> -<span class="sourceLineNo">7802</span> return count;<a name="line.7802"></a> -<span class="sourceLineNo">7803</span> }<a name="line.7803"></a> -<span class="sourceLineNo">7804</span><a name="line.7804"></a> +<span class="sourceLineNo">6969</span> writeRequestsCount.add(mutations.size());<a name="line.6969"></a> +<span class="sourceLineNo">6970</span> MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);<a name="line.6970"></a> +<span class="sourceLineNo">6971</span> processRowsWithLocks(proc, -1, nonceGroup, nonce);<a name="line.6971"></a> +<span class="sourceLineNo">6972</span> }<a name="line.6972"></a> +<span class="sourceLineNo">6973</span><a name="line.6973"></a> +<span class="sourceLineNo">6974</span> /**<a name="line.6974"></a> +<span class="sourceLineNo">6975</span> * @return statistics about the current load of the region<a name="line.6975"></a> +<span class="sourceLineNo">6976</span> */<a name="line.6976"></a> +<span class="sourceLineNo">6977</span> public ClientProtos.RegionLoadStats getLoadStatistics() {<a name="line.6977"></a> +<span class="sourceLineNo">6978</span> if (!regionStatsEnabled) {<a name="line.6978"></a> +<span class="sourceLineNo">6979</span> return null;<a name="line.6979"></a> +<span class="sourceLineNo">6980</span> }<a name="line.6980"></a> +<span class="sourceLineNo">6981</span> ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();<a name="line.6981"></a> +<span class="sourceLineNo">6982</span> stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this<a name="line.6982"></a> +<span class="sourceLineNo">6983</span> .memstoreFlushSize)));<a name="line.6983"></a> +<span class="sourceLineNo">6984</span> if (rsServices.getHeapMemoryManager() != null) {<a name="line.6984"></a> +<span class="sourceLineNo">6985</span> // the HeapMemoryManager uses -0.0 to signal a problem asking the JVM,<a na
<TRUNCATED>