http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f32f549a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html index 1d64963..a232cc8 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.WriteState.html @@ -6980,880 +6980,882 @@ <span class="sourceLineNo">6972</span> lock(this.updatesLock.readLock());<a name="line.6972"></a> <span class="sourceLineNo">6973</span> try {<a name="line.6973"></a> <span class="sourceLineNo">6974</span> Result cpResult = doCoprocessorPreCall(op, mutation);<a name="line.6974"></a> -<span class="sourceLineNo">6975</span> if (cpResult != null) return cpResult;<a name="line.6975"></a> -<span class="sourceLineNo">6976</span> Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());<a name="line.6976"></a> -<span class="sourceLineNo">6977</span> Map<Store, List<Cell>> forMemStore =<a name="line.6977"></a> -<span class="sourceLineNo">6978</span> new HashMap<Store, List<Cell>>(mutation.getFamilyCellMap().size());<a name="line.6978"></a> -<span class="sourceLineNo">6979</span> // Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and<a name="line.6979"></a> -<span class="sourceLineNo">6980</span> // what to return back to the client (in 'forMemStore' and 'results' respectively).<a name="line.6980"></a> -<span class="sourceLineNo">6981</span> WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);<a name="line.6981"></a> -<span class="sourceLineNo">6982</span> // Actually write to WAL now if a walEdit to apply.<a name="line.6982"></a> -<span class="sourceLineNo">6983</span> if (walEdit != null && !walEdit.isEmpty()) {<a name="line.6983"></a> -<span class="sourceLineNo">6984</span> writeEntry = doWALAppend(walEdit, durability, nonceGroup, nonce);<a name="line.6984"></a> -<span class="sourceLineNo">6985</span> } else {<a name="line.6985"></a> -<span class="sourceLineNo">6986</span> // If walEdits is empty, it means we skipped the WAL; update counters and start an mvcc<a name="line.6986"></a> -<span class="sourceLineNo">6987</span> // transaction.<a name="line.6987"></a> -<span class="sourceLineNo">6988</span> recordMutationWithoutWal(mutation.getFamilyCellMap());<a name="line.6988"></a> -<span class="sourceLineNo">6989</span> writeEntry = mvcc.begin();<a name="line.6989"></a> -<span class="sourceLineNo">6990</span> }<a name="line.6990"></a> -<span class="sourceLineNo">6991</span> // Now write to MemStore. Do it a column family at a time.<a name="line.6991"></a> -<span class="sourceLineNo">6992</span> long sequenceId = writeEntry.getWriteNumber();<a name="line.6992"></a> -<span class="sourceLineNo">6993</span> for (Map.Entry<Store, List<Cell>> e: forMemStore.entrySet()) {<a name="line.6993"></a> -<span class="sourceLineNo">6994</span> accumulatedResultSize +=<a name="line.6994"></a> -<span class="sourceLineNo">6995</span> applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId);<a name="line.6995"></a> -<span class="sourceLineNo">6996</span> }<a name="line.6996"></a> -<span class="sourceLineNo">6997</span> mvcc.completeAndWait(writeEntry);<a name="line.6997"></a> -<span class="sourceLineNo">6998</span> writeEntry = null;<a name="line.6998"></a> -<span class="sourceLineNo">6999</span> } finally {<a name="line.6999"></a> -<span class="sourceLineNo">7000</span> this.updatesLock.readLock().unlock();<a name="line.7000"></a> -<span class="sourceLineNo">7001</span> }<a name="line.7001"></a> -<span class="sourceLineNo">7002</span> // If results is null, then client asked that we not return the calculated results.<a name="line.7002"></a> -<span class="sourceLineNo">7003</span> return results != null? Result.create(results): null;<a name="line.7003"></a> -<span class="sourceLineNo">7004</span> } finally {<a name="line.7004"></a> -<span class="sourceLineNo">7005</span> // Call complete always, even on success. doDelta is doing a Get READ_UNCOMMITTED when it goes<a name="line.7005"></a> -<span class="sourceLineNo">7006</span> // to get current value under an exclusive lock so no need so no need to wait to return to<a name="line.7006"></a> -<span class="sourceLineNo">7007</span> // the client. Means only way to read-your-own-increment or append is to come in with an<a name="line.7007"></a> -<span class="sourceLineNo">7008</span> // a 0 increment.<a name="line.7008"></a> -<span class="sourceLineNo">7009</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.7009"></a> -<span class="sourceLineNo">7010</span> rowLock.release();<a name="line.7010"></a> -<span class="sourceLineNo">7011</span> // Request a cache flush if over the limit. Do it outside update lock.<a name="line.7011"></a> -<span class="sourceLineNo">7012</span> if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();<a name="line.7012"></a> -<span class="sourceLineNo">7013</span> closeRegionOperation(op);<a name="line.7013"></a> -<span class="sourceLineNo">7014</span> if (this.metricsRegion != null) {<a name="line.7014"></a> -<span class="sourceLineNo">7015</span> switch (op) {<a name="line.7015"></a> -<span class="sourceLineNo">7016</span> case INCREMENT:<a name="line.7016"></a> -<span class="sourceLineNo">7017</span> this.metricsRegion.updateIncrement();<a name="line.7017"></a> -<span class="sourceLineNo">7018</span> break;<a name="line.7018"></a> -<span class="sourceLineNo">7019</span> case APPEND:<a name="line.7019"></a> -<span class="sourceLineNo">7020</span> this.metricsRegion.updateAppend();<a name="line.7020"></a> -<span class="sourceLineNo">7021</span> break;<a name="line.7021"></a> -<span class="sourceLineNo">7022</span> default:<a name="line.7022"></a> +<span class="sourceLineNo">6975</span> if (cpResult != null) {<a name="line.6975"></a> +<span class="sourceLineNo">6976</span> return returnResults? cpResult: null;<a name="line.6976"></a> +<span class="sourceLineNo">6977</span> }<a name="line.6977"></a> +<span class="sourceLineNo">6978</span> Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());<a name="line.6978"></a> +<span class="sourceLineNo">6979</span> Map<Store, List<Cell>> forMemStore =<a name="line.6979"></a> +<span class="sourceLineNo">6980</span> new HashMap<Store, List<Cell>>(mutation.getFamilyCellMap().size());<a name="line.6980"></a> +<span class="sourceLineNo">6981</span> // Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and<a name="line.6981"></a> +<span class="sourceLineNo">6982</span> // what to return back to the client (in 'forMemStore' and 'results' respectively).<a name="line.6982"></a> +<span class="sourceLineNo">6983</span> WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);<a name="line.6983"></a> +<span class="sourceLineNo">6984</span> // Actually write to WAL now if a walEdit to apply.<a name="line.6984"></a> +<span class="sourceLineNo">6985</span> if (walEdit != null && !walEdit.isEmpty()) {<a name="line.6985"></a> +<span class="sourceLineNo">6986</span> writeEntry = doWALAppend(walEdit, durability, nonceGroup, nonce);<a name="line.6986"></a> +<span class="sourceLineNo">6987</span> } else {<a name="line.6987"></a> +<span class="sourceLineNo">6988</span> // If walEdits is empty, it means we skipped the WAL; update counters and start an mvcc<a name="line.6988"></a> +<span class="sourceLineNo">6989</span> // transaction.<a name="line.6989"></a> +<span class="sourceLineNo">6990</span> recordMutationWithoutWal(mutation.getFamilyCellMap());<a name="line.6990"></a> +<span class="sourceLineNo">6991</span> writeEntry = mvcc.begin();<a name="line.6991"></a> +<span class="sourceLineNo">6992</span> }<a name="line.6992"></a> +<span class="sourceLineNo">6993</span> // Now write to MemStore. Do it a column family at a time.<a name="line.6993"></a> +<span class="sourceLineNo">6994</span> long sequenceId = writeEntry.getWriteNumber();<a name="line.6994"></a> +<span class="sourceLineNo">6995</span> for (Map.Entry<Store, List<Cell>> e: forMemStore.entrySet()) {<a name="line.6995"></a> +<span class="sourceLineNo">6996</span> accumulatedResultSize +=<a name="line.6996"></a> +<span class="sourceLineNo">6997</span> applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId);<a name="line.6997"></a> +<span class="sourceLineNo">6998</span> }<a name="line.6998"></a> +<span class="sourceLineNo">6999</span> mvcc.completeAndWait(writeEntry);<a name="line.6999"></a> +<span class="sourceLineNo">7000</span> writeEntry = null;<a name="line.7000"></a> +<span class="sourceLineNo">7001</span> } finally {<a name="line.7001"></a> +<span class="sourceLineNo">7002</span> this.updatesLock.readLock().unlock();<a name="line.7002"></a> +<span class="sourceLineNo">7003</span> }<a name="line.7003"></a> +<span class="sourceLineNo">7004</span> // If results is null, then client asked that we not return the calculated results.<a name="line.7004"></a> +<span class="sourceLineNo">7005</span> return results != null && returnResults? Result.create(results): null;<a name="line.7005"></a> +<span class="sourceLineNo">7006</span> } finally {<a name="line.7006"></a> +<span class="sourceLineNo">7007</span> // Call complete always, even on success. doDelta is doing a Get READ_UNCOMMITTED when it goes<a name="line.7007"></a> +<span class="sourceLineNo">7008</span> // to get current value under an exclusive lock so no need so no need to wait to return to<a name="line.7008"></a> +<span class="sourceLineNo">7009</span> // the client. Means only way to read-your-own-increment or append is to come in with an<a name="line.7009"></a> +<span class="sourceLineNo">7010</span> // a 0 increment.<a name="line.7010"></a> +<span class="sourceLineNo">7011</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.7011"></a> +<span class="sourceLineNo">7012</span> rowLock.release();<a name="line.7012"></a> +<span class="sourceLineNo">7013</span> // Request a cache flush if over the limit. Do it outside update lock.<a name="line.7013"></a> +<span class="sourceLineNo">7014</span> if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();<a name="line.7014"></a> +<span class="sourceLineNo">7015</span> closeRegionOperation(op);<a name="line.7015"></a> +<span class="sourceLineNo">7016</span> if (this.metricsRegion != null) {<a name="line.7016"></a> +<span class="sourceLineNo">7017</span> switch (op) {<a name="line.7017"></a> +<span class="sourceLineNo">7018</span> case INCREMENT:<a name="line.7018"></a> +<span class="sourceLineNo">7019</span> this.metricsRegion.updateIncrement();<a name="line.7019"></a> +<span class="sourceLineNo">7020</span> break;<a name="line.7020"></a> +<span class="sourceLineNo">7021</span> case APPEND:<a name="line.7021"></a> +<span class="sourceLineNo">7022</span> this.metricsRegion.updateAppend();<a name="line.7022"></a> <span class="sourceLineNo">7023</span> break;<a name="line.7023"></a> -<span class="sourceLineNo">7024</span> }<a name="line.7024"></a> -<span class="sourceLineNo">7025</span> }<a name="line.7025"></a> -<span class="sourceLineNo">7026</span> }<a name="line.7026"></a> -<span class="sourceLineNo">7027</span> }<a name="line.7027"></a> -<span class="sourceLineNo">7028</span><a name="line.7028"></a> -<span class="sourceLineNo">7029</span> private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, long nonceGroup,<a name="line.7029"></a> -<span class="sourceLineNo">7030</span> long nonce)<a name="line.7030"></a> -<span class="sourceLineNo">7031</span> throws IOException {<a name="line.7031"></a> -<span class="sourceLineNo">7032</span> return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(),<a name="line.7032"></a> -<span class="sourceLineNo">7033</span> nonceGroup, nonce);<a name="line.7033"></a> -<span class="sourceLineNo">7034</span> }<a name="line.7034"></a> -<span class="sourceLineNo">7035</span><a name="line.7035"></a> -<span class="sourceLineNo">7036</span> /**<a name="line.7036"></a> -<span class="sourceLineNo">7037</span> * @return writeEntry associated with this append<a name="line.7037"></a> -<span class="sourceLineNo">7038</span> */<a name="line.7038"></a> -<span class="sourceLineNo">7039</span> private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,<a name="line.7039"></a> -<span class="sourceLineNo">7040</span> long now, long nonceGroup, long nonce)<a name="line.7040"></a> -<span class="sourceLineNo">7041</span> throws IOException {<a name="line.7041"></a> -<span class="sourceLineNo">7042</span> WriteEntry writeEntry = null;<a name="line.7042"></a> -<span class="sourceLineNo">7043</span> // Using default cluster id, as this can only happen in the originating cluster.<a name="line.7043"></a> -<span class="sourceLineNo">7044</span> // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey<a name="line.7044"></a> -<span class="sourceLineNo">7045</span> // here instead of WALKey directly to support legacy coprocessors.<a name="line.7045"></a> -<span class="sourceLineNo">7046</span> WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.7046"></a> -<span class="sourceLineNo">7047</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,<a name="line.7047"></a> -<span class="sourceLineNo">7048</span> nonceGroup, nonce, mvcc);<a name="line.7048"></a> -<span class="sourceLineNo">7049</span> try {<a name="line.7049"></a> -<span class="sourceLineNo">7050</span> long txid =<a name="line.7050"></a> -<span class="sourceLineNo">7051</span> this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.7051"></a> -<span class="sourceLineNo">7052</span> // Call sync on our edit.<a name="line.7052"></a> -<span class="sourceLineNo">7053</span> if (txid != 0) sync(txid, durability);<a name="line.7053"></a> -<span class="sourceLineNo">7054</span> writeEntry = walKey.getWriteEntry();<a name="line.7054"></a> -<span class="sourceLineNo">7055</span> } catch (IOException ioe) {<a name="line.7055"></a> -<span class="sourceLineNo">7056</span> if (walKey != null) mvcc.complete(walKey.getWriteEntry());<a name="line.7056"></a> -<span class="sourceLineNo">7057</span> throw ioe;<a name="line.7057"></a> -<span class="sourceLineNo">7058</span> }<a name="line.7058"></a> -<span class="sourceLineNo">7059</span> return writeEntry;<a name="line.7059"></a> -<span class="sourceLineNo">7060</span> }<a name="line.7060"></a> -<span class="sourceLineNo">7061</span><a name="line.7061"></a> -<span class="sourceLineNo">7062</span> /**<a name="line.7062"></a> -<span class="sourceLineNo">7063</span> * Do coprocessor pre-increment or pre-append call.<a name="line.7063"></a> -<span class="sourceLineNo">7064</span> * @return Result returned out of the coprocessor, which means bypass all further processing and<a name="line.7064"></a> -<span class="sourceLineNo">7065</span> * return the proffered Result instead, or null which means proceed.<a name="line.7065"></a> -<span class="sourceLineNo">7066</span> */<a name="line.7066"></a> -<span class="sourceLineNo">7067</span> private Result doCoprocessorPreCall(final Operation op, final Mutation mutation)<a name="line.7067"></a> -<span class="sourceLineNo">7068</span> throws IOException {<a name="line.7068"></a> -<span class="sourceLineNo">7069</span> Result result = null;<a name="line.7069"></a> -<span class="sourceLineNo">7070</span> if (this.coprocessorHost != null) {<a name="line.7070"></a> -<span class="sourceLineNo">7071</span> switch(op) {<a name="line.7071"></a> -<span class="sourceLineNo">7072</span> case INCREMENT:<a name="line.7072"></a> -<span class="sourceLineNo">7073</span> result = this.coprocessorHost.preIncrementAfterRowLock((Increment)mutation);<a name="line.7073"></a> -<span class="sourceLineNo">7074</span> break;<a name="line.7074"></a> -<span class="sourceLineNo">7075</span> case APPEND:<a name="line.7075"></a> -<span class="sourceLineNo">7076</span> result = this.coprocessorHost.preAppendAfterRowLock((Append)mutation);<a name="line.7076"></a> -<span class="sourceLineNo">7077</span> break;<a name="line.7077"></a> -<span class="sourceLineNo">7078</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.7078"></a> -<span class="sourceLineNo">7079</span> }<a name="line.7079"></a> -<span class="sourceLineNo">7080</span> }<a name="line.7080"></a> -<span class="sourceLineNo">7081</span> return result;<a name="line.7081"></a> -<span class="sourceLineNo">7082</span> }<a name="line.7082"></a> -<span class="sourceLineNo">7083</span><a name="line.7083"></a> -<span class="sourceLineNo">7084</span> /**<a name="line.7084"></a> -<span class="sourceLineNo">7085</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client; these Sets are not<a name="line.7085"></a> -<span class="sourceLineNo">7086</span> * always the same dependent on whether to write WAL or if the amount to increment is zero (in<a name="line.7086"></a> -<span class="sourceLineNo">7087</span> * this case we write back nothing, just return latest Cell value to the client).<a name="line.7087"></a> -<span class="sourceLineNo">7088</span> *<a name="line.7088"></a> -<span class="sourceLineNo">7089</span> * @param results Fill in here what goes back to the Client if it is non-null (if null, client<a name="line.7089"></a> -<span class="sourceLineNo">7090</span> * doesn't want results).<a name="line.7090"></a> -<span class="sourceLineNo">7091</span> * @param forMemStore Fill in here what to apply to the MemStore (by Store).<a name="line.7091"></a> -<span class="sourceLineNo">7092</span> * @return A WALEdit to apply to WAL or null if we are to skip the WAL.<a name="line.7092"></a> -<span class="sourceLineNo">7093</span> */<a name="line.7093"></a> -<span class="sourceLineNo">7094</span> private WALEdit reckonDeltas(final Operation op, final Mutation mutation,<a name="line.7094"></a> -<span class="sourceLineNo">7095</span> final Durability effectiveDurability, final Map<Store, List<Cell>> forMemStore,<a name="line.7095"></a> -<span class="sourceLineNo">7096</span> final List<Cell> results)<a name="line.7096"></a> -<span class="sourceLineNo">7097</span> throws IOException {<a name="line.7097"></a> -<span class="sourceLineNo">7098</span> WALEdit walEdit = null;<a name="line.7098"></a> -<span class="sourceLineNo">7099</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7099"></a> -<span class="sourceLineNo">7100</span> final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;<a name="line.7100"></a> -<span class="sourceLineNo">7101</span> // Process a Store/family at a time.<a name="line.7101"></a> -<span class="sourceLineNo">7102</span> for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {<a name="line.7102"></a> -<span class="sourceLineNo">7103</span> final byte [] columnFamilyName = entry.getKey();<a name="line.7103"></a> -<span class="sourceLineNo">7104</span> List<Cell> deltas = entry.getValue();<a name="line.7104"></a> -<span class="sourceLineNo">7105</span> Store store = this.stores.get(columnFamilyName);<a name="line.7105"></a> -<span class="sourceLineNo">7106</span> // Reckon for the Store what to apply to WAL and MemStore.<a name="line.7106"></a> -<span class="sourceLineNo">7107</span> List<Cell> toApply =<a name="line.7107"></a> -<span class="sourceLineNo">7108</span> reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);<a name="line.7108"></a> -<span class="sourceLineNo">7109</span> if (!toApply.isEmpty()) {<a name="line.7109"></a> -<span class="sourceLineNo">7110</span> forMemStore.put(store, toApply);<a name="line.7110"></a> -<span class="sourceLineNo">7111</span> if (writeToWAL) {<a name="line.7111"></a> -<span class="sourceLineNo">7112</span> if (walEdit == null) {<a name="line.7112"></a> -<span class="sourceLineNo">7113</span> walEdit = new WALEdit();<a name="line.7113"></a> -<span class="sourceLineNo">7114</span> }<a name="line.7114"></a> -<span class="sourceLineNo">7115</span> walEdit.getCells().addAll(toApply);<a name="line.7115"></a> -<span class="sourceLineNo">7116</span> }<a name="line.7116"></a> -<span class="sourceLineNo">7117</span> }<a name="line.7117"></a> -<span class="sourceLineNo">7118</span> }<a name="line.7118"></a> -<span class="sourceLineNo">7119</span> return walEdit;<a name="line.7119"></a> -<span class="sourceLineNo">7120</span> }<a name="line.7120"></a> -<span class="sourceLineNo">7121</span><a name="line.7121"></a> -<span class="sourceLineNo">7122</span> /**<a name="line.7122"></a> -<span class="sourceLineNo">7123</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed<a name="line.7123"></a> -<span class="sourceLineNo">7124</span> * column family/Store.<a name="line.7124"></a> -<span class="sourceLineNo">7125</span> *<a name="line.7125"></a> -<span class="sourceLineNo">7126</span> * Does Get of current value and then adds passed in deltas for this Store returning the result.<a name="line.7126"></a> +<span class="sourceLineNo">7024</span> default:<a name="line.7024"></a> +<span class="sourceLineNo">7025</span> break;<a name="line.7025"></a> +<span class="sourceLineNo">7026</span> }<a name="line.7026"></a> +<span class="sourceLineNo">7027</span> }<a name="line.7027"></a> +<span class="sourceLineNo">7028</span> }<a name="line.7028"></a> +<span class="sourceLineNo">7029</span> }<a name="line.7029"></a> +<span class="sourceLineNo">7030</span><a name="line.7030"></a> +<span class="sourceLineNo">7031</span> private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, long nonceGroup,<a name="line.7031"></a> +<span class="sourceLineNo">7032</span> long nonce)<a name="line.7032"></a> +<span class="sourceLineNo">7033</span> throws IOException {<a name="line.7033"></a> +<span class="sourceLineNo">7034</span> return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(),<a name="line.7034"></a> +<span class="sourceLineNo">7035</span> nonceGroup, nonce);<a name="line.7035"></a> +<span class="sourceLineNo">7036</span> }<a name="line.7036"></a> +<span class="sourceLineNo">7037</span><a name="line.7037"></a> +<span class="sourceLineNo">7038</span> /**<a name="line.7038"></a> +<span class="sourceLineNo">7039</span> * @return writeEntry associated with this append<a name="line.7039"></a> +<span class="sourceLineNo">7040</span> */<a name="line.7040"></a> +<span class="sourceLineNo">7041</span> private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,<a name="line.7041"></a> +<span class="sourceLineNo">7042</span> long now, long nonceGroup, long nonce)<a name="line.7042"></a> +<span class="sourceLineNo">7043</span> throws IOException {<a name="line.7043"></a> +<span class="sourceLineNo">7044</span> WriteEntry writeEntry = null;<a name="line.7044"></a> +<span class="sourceLineNo">7045</span> // Using default cluster id, as this can only happen in the originating cluster.<a name="line.7045"></a> +<span class="sourceLineNo">7046</span> // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey<a name="line.7046"></a> +<span class="sourceLineNo">7047</span> // here instead of WALKey directly to support legacy coprocessors.<a name="line.7047"></a> +<span class="sourceLineNo">7048</span> WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.7048"></a> +<span class="sourceLineNo">7049</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,<a name="line.7049"></a> +<span class="sourceLineNo">7050</span> nonceGroup, nonce, mvcc);<a name="line.7050"></a> +<span class="sourceLineNo">7051</span> try {<a name="line.7051"></a> +<span class="sourceLineNo">7052</span> long txid =<a name="line.7052"></a> +<span class="sourceLineNo">7053</span> this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.7053"></a> +<span class="sourceLineNo">7054</span> // Call sync on our edit.<a name="line.7054"></a> +<span class="sourceLineNo">7055</span> if (txid != 0) sync(txid, durability);<a name="line.7055"></a> +<span class="sourceLineNo">7056</span> writeEntry = walKey.getWriteEntry();<a name="line.7056"></a> +<span class="sourceLineNo">7057</span> } catch (IOException ioe) {<a name="line.7057"></a> +<span class="sourceLineNo">7058</span> if (walKey != null) mvcc.complete(walKey.getWriteEntry());<a name="line.7058"></a> +<span class="sourceLineNo">7059</span> throw ioe;<a name="line.7059"></a> +<span class="sourceLineNo">7060</span> }<a name="line.7060"></a> +<span class="sourceLineNo">7061</span> return writeEntry;<a name="line.7061"></a> +<span class="sourceLineNo">7062</span> }<a name="line.7062"></a> +<span class="sourceLineNo">7063</span><a name="line.7063"></a> +<span class="sourceLineNo">7064</span> /**<a name="line.7064"></a> +<span class="sourceLineNo">7065</span> * Do coprocessor pre-increment or pre-append call.<a name="line.7065"></a> +<span class="sourceLineNo">7066</span> * @return Result returned out of the coprocessor, which means bypass all further processing and<a name="line.7066"></a> +<span class="sourceLineNo">7067</span> * return the proffered Result instead, or null which means proceed.<a name="line.7067"></a> +<span class="sourceLineNo">7068</span> */<a name="line.7068"></a> +<span class="sourceLineNo">7069</span> private Result doCoprocessorPreCall(final Operation op, final Mutation mutation)<a name="line.7069"></a> +<span class="sourceLineNo">7070</span> throws IOException {<a name="line.7070"></a> +<span class="sourceLineNo">7071</span> Result result = null;<a name="line.7071"></a> +<span class="sourceLineNo">7072</span> if (this.coprocessorHost != null) {<a name="line.7072"></a> +<span class="sourceLineNo">7073</span> switch(op) {<a name="line.7073"></a> +<span class="sourceLineNo">7074</span> case INCREMENT:<a name="line.7074"></a> +<span class="sourceLineNo">7075</span> result = this.coprocessorHost.preIncrementAfterRowLock((Increment)mutation);<a name="line.7075"></a> +<span class="sourceLineNo">7076</span> break;<a name="line.7076"></a> +<span class="sourceLineNo">7077</span> case APPEND:<a name="line.7077"></a> +<span class="sourceLineNo">7078</span> result = this.coprocessorHost.preAppendAfterRowLock((Append)mutation);<a name="line.7078"></a> +<span class="sourceLineNo">7079</span> break;<a name="line.7079"></a> +<span class="sourceLineNo">7080</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.7080"></a> +<span class="sourceLineNo">7081</span> }<a name="line.7081"></a> +<span class="sourceLineNo">7082</span> }<a name="line.7082"></a> +<span class="sourceLineNo">7083</span> return result;<a name="line.7083"></a> +<span class="sourceLineNo">7084</span> }<a name="line.7084"></a> +<span class="sourceLineNo">7085</span><a name="line.7085"></a> +<span class="sourceLineNo">7086</span> /**<a name="line.7086"></a> +<span class="sourceLineNo">7087</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client; these Sets are not<a name="line.7087"></a> +<span class="sourceLineNo">7088</span> * always the same dependent on whether to write WAL or if the amount to increment is zero (in<a name="line.7088"></a> +<span class="sourceLineNo">7089</span> * this case we write back nothing, just return latest Cell value to the client).<a name="line.7089"></a> +<span class="sourceLineNo">7090</span> *<a name="line.7090"></a> +<span class="sourceLineNo">7091</span> * @param results Fill in here what goes back to the Client if it is non-null (if null, client<a name="line.7091"></a> +<span class="sourceLineNo">7092</span> * doesn't want results).<a name="line.7092"></a> +<span class="sourceLineNo">7093</span> * @param forMemStore Fill in here what to apply to the MemStore (by Store).<a name="line.7093"></a> +<span class="sourceLineNo">7094</span> * @return A WALEdit to apply to WAL or null if we are to skip the WAL.<a name="line.7094"></a> +<span class="sourceLineNo">7095</span> */<a name="line.7095"></a> +<span class="sourceLineNo">7096</span> private WALEdit reckonDeltas(final Operation op, final Mutation mutation,<a name="line.7096"></a> +<span class="sourceLineNo">7097</span> final Durability effectiveDurability, final Map<Store, List<Cell>> forMemStore,<a name="line.7097"></a> +<span class="sourceLineNo">7098</span> final List<Cell> results)<a name="line.7098"></a> +<span class="sourceLineNo">7099</span> throws IOException {<a name="line.7099"></a> +<span class="sourceLineNo">7100</span> WALEdit walEdit = null;<a name="line.7100"></a> +<span class="sourceLineNo">7101</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7101"></a> +<span class="sourceLineNo">7102</span> final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;<a name="line.7102"></a> +<span class="sourceLineNo">7103</span> // Process a Store/family at a time.<a name="line.7103"></a> +<span class="sourceLineNo">7104</span> for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {<a name="line.7104"></a> +<span class="sourceLineNo">7105</span> final byte [] columnFamilyName = entry.getKey();<a name="line.7105"></a> +<span class="sourceLineNo">7106</span> List<Cell> deltas = entry.getValue();<a name="line.7106"></a> +<span class="sourceLineNo">7107</span> Store store = this.stores.get(columnFamilyName);<a name="line.7107"></a> +<span class="sourceLineNo">7108</span> // Reckon for the Store what to apply to WAL and MemStore.<a name="line.7108"></a> +<span class="sourceLineNo">7109</span> List<Cell> toApply =<a name="line.7109"></a> +<span class="sourceLineNo">7110</span> reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);<a name="line.7110"></a> +<span class="sourceLineNo">7111</span> if (!toApply.isEmpty()) {<a name="line.7111"></a> +<span class="sourceLineNo">7112</span> forMemStore.put(store, toApply);<a name="line.7112"></a> +<span class="sourceLineNo">7113</span> if (writeToWAL) {<a name="line.7113"></a> +<span class="sourceLineNo">7114</span> if (walEdit == null) {<a name="line.7114"></a> +<span class="sourceLineNo">7115</span> walEdit = new WALEdit();<a name="line.7115"></a> +<span class="sourceLineNo">7116</span> }<a name="line.7116"></a> +<span class="sourceLineNo">7117</span> walEdit.getCells().addAll(toApply);<a name="line.7117"></a> +<span class="sourceLineNo">7118</span> }<a name="line.7118"></a> +<span class="sourceLineNo">7119</span> }<a name="line.7119"></a> +<span class="sourceLineNo">7120</span> }<a name="line.7120"></a> +<span class="sourceLineNo">7121</span> return walEdit;<a name="line.7121"></a> +<span class="sourceLineNo">7122</span> }<a name="line.7122"></a> +<span class="sourceLineNo">7123</span><a name="line.7123"></a> +<span class="sourceLineNo">7124</span> /**<a name="line.7124"></a> +<span class="sourceLineNo">7125</span> * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed<a name="line.7125"></a> +<span class="sourceLineNo">7126</span> * column family/Store.<a name="line.7126"></a> <span class="sourceLineNo">7127</span> *<a name="line.7127"></a> -<span class="sourceLineNo">7128</span> * @param op Whether Increment or Append<a name="line.7128"></a> -<span class="sourceLineNo">7129</span> * @param mutation The encompassing Mutation object<a name="line.7129"></a> -<span class="sourceLineNo">7130</span> * @param deltas Changes to apply to this Store; either increment amount or data to append<a name="line.7130"></a> -<span class="sourceLineNo">7131</span> * @param results In here we accumulate all the Cells we are to return to the client; this List<a name="line.7131"></a> -<span class="sourceLineNo">7132</span> * can be larger than what we return in case where delta is zero; i.e. don't write<a name="line.7132"></a> -<span class="sourceLineNo">7133</span> * out new values, just return current value. If null, client doesn't want results returned.<a name="line.7133"></a> -<span class="sourceLineNo">7134</span> * @return Resulting Cells after <code>deltas</code> have been applied to current<a name="line.7134"></a> -<span class="sourceLineNo">7135</span> * values. Side effect is our filling out of the <code>results</code> List.<a name="line.7135"></a> -<span class="sourceLineNo">7136</span> */<a name="line.7136"></a> -<span class="sourceLineNo">7137</span> private List<Cell> reckonDeltasByStore(final Store store, final Operation op,<a name="line.7137"></a> -<span class="sourceLineNo">7138</span> final Mutation mutation, final Durability effectiveDurability, final long now,<a name="line.7138"></a> -<span class="sourceLineNo">7139</span> final List<Cell> deltas, final List<Cell> results)<a name="line.7139"></a> -<span class="sourceLineNo">7140</span> throws IOException {<a name="line.7140"></a> -<span class="sourceLineNo">7141</span> byte [] columnFamily = store.getFamily().getName();<a name="line.7141"></a> -<span class="sourceLineNo">7142</span> List<Cell> toApply = new ArrayList<Cell>(deltas.size());<a name="line.7142"></a> -<span class="sourceLineNo">7143</span> // Get previous values for all columns in this family.<a name="line.7143"></a> -<span class="sourceLineNo">7144</span> List<Cell> currentValues = get(mutation, store, deltas,<a name="line.7144"></a> -<span class="sourceLineNo">7145</span> null/*Default IsolationLevel*/,<a name="line.7145"></a> -<span class="sourceLineNo">7146</span> op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null);<a name="line.7146"></a> -<span class="sourceLineNo">7147</span> // Iterate the input columns and update existing values if they were found, otherwise<a name="line.7147"></a> -<span class="sourceLineNo">7148</span> // add new column initialized to the delta amount<a name="line.7148"></a> -<span class="sourceLineNo">7149</span> int currentValuesIndex = 0;<a name="line.7149"></a> -<span class="sourceLineNo">7150</span> for (int i = 0; i < deltas.size(); i++) {<a name="line.7150"></a> -<span class="sourceLineNo">7151</span> Cell delta = deltas.get(i);<a name="line.7151"></a> -<span class="sourceLineNo">7152</span> Cell currentValue = null;<a name="line.7152"></a> -<span class="sourceLineNo">7153</span> if (currentValuesIndex < currentValues.size() &&<a name="line.7153"></a> -<span class="sourceLineNo">7154</span> CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {<a name="line.7154"></a> -<span class="sourceLineNo">7155</span> currentValue = currentValues.get(currentValuesIndex);<a name="line.7155"></a> -<span class="sourceLineNo">7156</span> if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {<a name="line.7156"></a> -<span class="sourceLineNo">7157</span> currentValuesIndex++;<a name="line.7157"></a> -<span class="sourceLineNo">7158</span> }<a name="line.7158"></a> -<span class="sourceLineNo">7159</span> }<a name="line.7159"></a> -<span class="sourceLineNo">7160</span> // Switch on whether this an increment or an append building the new Cell to apply.<a name="line.7160"></a> -<span class="sourceLineNo">7161</span> Cell newCell = null;<a name="line.7161"></a> -<span class="sourceLineNo">7162</span> MutationType mutationType = null;<a name="line.7162"></a> -<span class="sourceLineNo">7163</span> boolean apply = true;<a name="line.7163"></a> -<span class="sourceLineNo">7164</span> switch (op) {<a name="line.7164"></a> -<span class="sourceLineNo">7165</span> case INCREMENT:<a name="line.7165"></a> -<span class="sourceLineNo">7166</span> mutationType = MutationType.INCREMENT;<a name="line.7166"></a> -<span class="sourceLineNo">7167</span> // If delta amount to apply is 0, don't write WAL or MemStore.<a name="line.7167"></a> -<span class="sourceLineNo">7168</span> long deltaAmount = getLongValue(delta);<a name="line.7168"></a> -<span class="sourceLineNo">7169</span> apply = deltaAmount != 0;<a name="line.7169"></a> -<span class="sourceLineNo">7170</span> newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now,<a name="line.7170"></a> -<span class="sourceLineNo">7171</span> (Increment)mutation);<a name="line.7171"></a> -<span class="sourceLineNo">7172</span> break;<a name="line.7172"></a> -<span class="sourceLineNo">7173</span> case APPEND:<a name="line.7173"></a> -<span class="sourceLineNo">7174</span> mutationType = MutationType.APPEND;<a name="line.7174"></a> -<span class="sourceLineNo">7175</span> // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to.<a name="line.7175"></a> -<span class="sourceLineNo">7176</span> newCell = reckonAppend(delta, currentValue, now, (Append)mutation);<a name="line.7176"></a> -<span class="sourceLineNo">7177</span> break;<a name="line.7177"></a> -<span class="sourceLineNo">7178</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.7178"></a> -<span class="sourceLineNo">7179</span> }<a name="line.7179"></a> -<span class="sourceLineNo">7180</span><a name="line.7180"></a> -<span class="sourceLineNo">7181</span> // Give coprocessors a chance to update the new cell<a name="line.7181"></a> -<span class="sourceLineNo">7182</span> if (coprocessorHost != null) {<a name="line.7182"></a> -<span class="sourceLineNo">7183</span> newCell =<a name="line.7183"></a> -<span class="sourceLineNo">7184</span> coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);<a name="line.7184"></a> -<span class="sourceLineNo">7185</span> }<a name="line.7185"></a> -<span class="sourceLineNo">7186</span> // If apply, we need to update memstore/WAL with new value; add it toApply.<a name="line.7186"></a> -<span class="sourceLineNo">7187</span> if (apply) {<a name="line.7187"></a> -<span class="sourceLineNo">7188</span> toApply.add(newCell);<a name="line.7188"></a> -<span class="sourceLineNo">7189</span> }<a name="line.7189"></a> -<span class="sourceLineNo">7190</span> // Add to results to get returned to the Client. If null, cilent does not want results.<a name="line.7190"></a> -<span class="sourceLineNo">7191</span> if (results != null) {<a name="line.7191"></a> -<span class="sourceLineNo">7192</span> results.add(newCell);<a name="line.7192"></a> -<span class="sourceLineNo">7193</span> }<a name="line.7193"></a> -<span class="sourceLineNo">7194</span> }<a name="line.7194"></a> -<span class="sourceLineNo">7195</span> return toApply;<a name="line.7195"></a> -<span class="sourceLineNo">7196</span> }<a name="line.7196"></a> -<span class="sourceLineNo">7197</span><a name="line.7197"></a> -<span class="sourceLineNo">7198</span> /**<a name="line.7198"></a> -<span class="sourceLineNo">7199</span> * Calculate new Increment Cell.<a name="line.7199"></a> -<span class="sourceLineNo">7200</span> * @return New Increment Cell with delta applied to currentValue if currentValue is not null;<a name="line.7200"></a> -<span class="sourceLineNo">7201</span> * otherwise, a new Cell with the delta set as its value.<a name="line.7201"></a> -<span class="sourceLineNo">7202</span> */<a name="line.7202"></a> -<span class="sourceLineNo">7203</span> private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue,<a name="line.7203"></a> -<span class="sourceLineNo">7204</span> byte [] columnFamily, final long now, Mutation mutation)<a name="line.7204"></a> -<span class="sourceLineNo">7205</span> throws IOException {<a name="line.7205"></a> -<span class="sourceLineNo">7206</span> // Forward any tags found on the delta.<a name="line.7206"></a> -<span class="sourceLineNo">7207</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.7207"></a> -<span class="sourceLineNo">7208</span> long newValue = deltaAmount;<a name="line.7208"></a> -<span class="sourceLineNo">7209</span> long ts = now;<a name="line.7209"></a> -<span class="sourceLineNo">7210</span> if (currentValue != null) {<a name="line.7210"></a> -<span class="sourceLineNo">7211</span> tags = TagUtil.carryForwardTags(tags, currentValue);<a name="line.7211"></a> -<span class="sourceLineNo">7212</span> ts = Math.max(now, currentValue.getTimestamp());<a name="line.7212"></a> -<span class="sourceLineNo">7213</span> newValue += getLongValue(currentValue);<a name="line.7213"></a> -<span class="sourceLineNo">7214</span> }<a name="line.7214"></a> -<span class="sourceLineNo">7215</span> // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made...<a name="line.7215"></a> -<span class="sourceLineNo">7216</span> // doesn't work well with offheaping or if we are doing a different Cell type.<a name="line.7216"></a> -<span class="sourceLineNo">7217</span> byte [] incrementAmountInBytes = Bytes.toBytes(newValue);<a name="line.7217"></a> -<span class="sourceLineNo">7218</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7218"></a> -<span class="sourceLineNo">7219</span> byte [] row = mutation.getRow();<a name="line.7219"></a> -<span class="sourceLineNo">7220</span> return new KeyValue(row, 0, row.length,<a name="line.7220"></a> -<span class="sourceLineNo">7221</span> columnFamily, 0, columnFamily.length,<a name="line.7221"></a> -<span class="sourceLineNo">7222</span> delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(),<a name="line.7222"></a> -<span class="sourceLineNo">7223</span> ts, KeyValue.Type.Put,<a name="line.7223"></a> -<span class="sourceLineNo">7224</span> incrementAmountInBytes, 0, incrementAmountInBytes.length,<a name="line.7224"></a> -<span class="sourceLineNo">7225</span> tags);<a name="line.7225"></a> -<span class="sourceLineNo">7226</span> }<a name="line.7226"></a> -<span class="sourceLineNo">7227</span><a name="line.7227"></a> -<span class="sourceLineNo">7228</span> private Cell reckonAppend(final Cell delta, final Cell currentValue, final long now,<a name="line.7228"></a> -<span class="sourceLineNo">7229</span> Append mutation)<a name="line.7229"></a> -<span class="sourceLineNo">7230</span> throws IOException {<a name="line.7230"></a> -<span class="sourceLineNo">7231</span> // Forward any tags found on the delta.<a name="line.7231"></a> -<span class="sourceLineNo">7232</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.7232"></a> -<span class="sourceLineNo">7233</span> long ts = now;<a name="line.7233"></a> -<span class="sourceLineNo">7234</span> Cell newCell = null;<a name="line.7234"></a> -<span class="sourceLineNo">7235</span> byte [] row = mutation.getRow();<a name="line.7235"></a> -<span class="sourceLineNo">7236</span> if (currentValue != null) {<a name="line.7236"></a> -<span class="sourceLineNo">7237</span> tags = TagUtil.carryForwardTags(tags, currentValue);<a name="line.7237"></a> -<span class="sourceLineNo">7238</span> ts = Math.max(now, currentValue.getTimestamp());<a name="line.7238"></a> -<span class="sourceLineNo">7239</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7239"></a> -<span class="sourceLineNo">7240</span> byte[] tagBytes = TagUtil.fromList(tags);<a name="line.7240"></a> -<span class="sourceLineNo">7241</span> // Allocate an empty cell and copy in all parts.<a name="line.7241"></a> -<span class="sourceLineNo">7242</span> // TODO: This is intimate knowledge of how a KeyValue is made. Undo!!! Prevents our doing<a name="line.7242"></a> -<span class="sourceLineNo">7243</span> // other Cell types. Copying on-heap too if an off-heap Cell.<a name="line.7243"></a> -<span class="sourceLineNo">7244</span> newCell = new KeyValue(row.length, delta.getFamilyLength(),<a name="line.7244"></a> -<span class="sourceLineNo">7245</span> delta.getQualifierLength(), ts, KeyValue.Type.Put,<a name="line.7245"></a> -<span class="sourceLineNo">7246</span> delta.getValueLength() + currentValue.getValueLength(),<a name="line.7246"></a> -<span class="sourceLineNo">7247</span> tagBytes == null? 0: tagBytes.length);<a name="line.7247"></a> -<span class="sourceLineNo">7248</span> // Copy in row, family, and qualifier<a name="line.7248"></a> -<span class="sourceLineNo">7249</span> System.arraycopy(row, 0, newCell.getRowArray(), newCell.getRowOffset(), row.length);<a name="line.7249"></a> -<span class="sourceLineNo">7250</span> System.arraycopy(delta.getFamilyArray(), delta.getFamilyOffset(),<a name="line.7250"></a> -<span class="sourceLineNo">7251</span> newCell.getFamilyArray(), newCell.getFamilyOffset(), delta.getFamilyLength());<a name="line.7251"></a> -<span class="sourceLineNo">7252</span> System.arraycopy(delta.getQualifierArray(), delta.getQualifierOffset(),<a name="line.7252"></a> -<span class="sourceLineNo">7253</span> newCell.getQualifierArray(), newCell.getQualifierOffset(), delta.getQualifierLength());<a name="line.7253"></a> -<span class="sourceLineNo">7254</span> // Copy in the value<a name="line.7254"></a> -<span class="sourceLineNo">7255</span> CellUtil.copyValueTo(currentValue, newCell.getValueArray(), newCell.getValueOffset());<a name="line.7255"></a> -<span class="sourceLineNo">7256</span> System.arraycopy(delta.getValueArray(), delta.getValueOffset(),<a name="line.7256"></a> -<span class="sourceLineNo">7257</span> newCell.getValueArray(), newCell.getValueOffset() + currentValue.getValueLength(),<a name="line.7257"></a> -<span class="sourceLineNo">7258</span> delta.getValueLength());<a name="line.7258"></a> -<span class="sourceLineNo">7259</span> // Copy in tag data<a name="line.7259"></a> -<span class="sourceLineNo">7260</span> if (tagBytes != null) {<a name="line.7260"></a> -<span class="sourceLineNo">7261</span> System.arraycopy(tagBytes, 0,<a name="line.7261"></a> -<span class="sourceLineNo">7262</span> newCell.getTagsArray(), newCell.getTagsOffset(), tagBytes.length);<a name="line.7262"></a> -<span class="sourceLineNo">7263</span> }<a name="line.7263"></a> -<span class="sourceLineNo">7264</span> } else {<a name="line.7264"></a> -<span class="sourceLineNo">7265</span> // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP<a name="line.7265"></a> -<span class="sourceLineNo">7266</span> CellUtil.updateLatestStamp(delta, now);<a name="line.7266"></a> -<span class="sourceLineNo">7267</span> newCell = delta;<a name="line.7267"></a> -<span class="sourceLineNo">7268</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7268"></a> -<span class="sourceLineNo">7269</span> if (tags != null) {<a name="line.7269"></a> -<span class="sourceLineNo">7270</span> newCell = new TagRewriteCell(delta, TagUtil.fromList(tags));<a name="line.7270"></a> -<span class="sourceLineNo">7271</span> }<a name="line.7271"></a> -<span class="sourceLineNo">7272</span> }<a name="line.7272"></a> -<span class="sourceLineNo">7273</span> return newCell;<a name="line.7273"></a> -<span class="sourceLineNo">7274</span> }<a name="line.7274"></a> -<span class="sourceLineNo">7275</span><a name="line.7275"></a> -<span class="sourceLineNo">7276</span> /**<a name="line.7276"></a> -<span class="sourceLineNo">7277</span> * @return Get the long out of the passed in Cell<a name="line.7277"></a> -<span class="sourceLineNo">7278</span> */<a name="line.7278"></a> -<span class="sourceLineNo">7279</span> private static long getLongValue(final Cell cell) throws DoNotRetryIOException {<a name="line.7279"></a> -<span class="sourceLineNo">7280</span> int len = cell.getValueLength();<a name="line.7280"></a> -<span class="sourceLineNo">7281</span> if (len != Bytes.SIZEOF_LONG) {<a name="line.7281"></a> -<span class="sourceLineNo">7282</span> // throw DoNotRetryIOException instead of IllegalArgumentException<a name="line.7282"></a> -<span class="sourceLineNo">7283</span> throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");<a name="line.7283"></a> -<span class="sourceLineNo">7284</span> }<a name="line.7284"></a> -<span class="sourceLineNo">7285</span> return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);<a name="line.7285"></a> -<span class="sourceLineNo">7286</span> }<a name="line.7286"></a> -<span class="sourceLineNo">7287</span><a name="line.7287"></a> -<span class="sourceLineNo">7288</span> /**<a name="line.7288"></a> -<span class="sourceLineNo">7289</span> * Do a specific Get on passed <code>columnFamily</code> and column qualifiers.<a name="line.7289"></a> -<span class="sourceLineNo">7290</span> * @param mutation Mutation we are doing this Get for.<a name="line.7290"></a> -<span class="sourceLineNo">7291</span> * @param columnFamily Which column family on row (TODO: Go all Gets in one go)<a name="line.7291"></a> -<span class="sourceLineNo">7292</span> * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.<a name="line.7292"></a> -<span class="sourceLineNo">7293</span> * @return Return list of Cells found.<a name="line.7293"></a> -<span class="sourceLineNo">7294</span> */<a name="line.7294"></a> -<span class="sourceLineNo">7295</span> private List<Cell> get(final Mutation mutation, final Store store,<a name="line.7295"></a> -<span class="sourceLineNo">7296</span> final List<Cell> coordinates, final IsolationLevel isolation, final TimeRange tr)<a name="line.7296"></a> -<span class="sourceLineNo">7297</span> throws IOException {<a name="line.7297"></a> -<span class="sourceLineNo">7298</span> // Sort the cells so that they match the order that they appear in the Get results. Otherwise,<a name="line.7298"></a> -<span class="sourceLineNo">7299</span> // we won't be able to find the existing values if the cells are not specified in order by the<a name="line.7299"></a> -<span class="sourceLineNo">7300</span> // client since cells are in an array list.<a name="line.7300"></a> -<span class="sourceLineNo">7301</span> // TODO: I don't get why we are sorting. St.Ack 20150107<a name="line.7301"></a> -<span class="sourceLineNo">7302</span> sort(coordinates, store.getComparator());<a name="line.7302"></a> -<span class="sourceLineNo">7303</span> Get get = new Get(mutation.getRow());<a name="line.7303"></a> -<span class="sourceLineNo">7304</span> if (isolation != null) {<a name="line.7304"></a> -<span class="sourceLineNo">7305</span> get.setIsolationLevel(isolation);<a name="line.7305"></a> -<span class="sourceLineNo">7306</span> }<a name="line.7306"></a> -<span class="sourceLineNo">7307</span> for (Cell cell: coordinates) {<a name="line.7307"></a> -<span class="sourceLineNo">7308</span> get.addColumn(store.getFamily().getName(), CellUtil.cloneQualifier(cell));<a name="line.7308"></a> -<span class="sourceLineNo">7309</span> }<a name="line.7309"></a> -<span class="sourceLineNo">7310</span> // Increments carry time range. If an Increment instance, put it on the Get.<a name="line.7310"></a> -<span class="sourceLineNo">7311</span> if (tr != null) {<a name="line.7311"></a> -<span class="sourceLineNo">7312</span> get.setTimeRange(tr.getMin(), tr.getMax());<a name="line.7312"></a> -<span class="sourceLineNo">7313</span> }<a name="line.7313"></a> -<span class="sourceLineNo">7314</span> return get(get, false);<a name="line.7314"></a> -<span class="sourceLineNo">7315</span> }<a name="line.7315"></a> -<span class="sourceLineNo">7316</span><a name="line.7316"></a> -<span class="sourceLineNo">7317</span> /**<a name="line.7317"></a> -<span class="sourceLineNo">7318</span> * @return Sorted list of <code>cells</code> using <code>comparator</code><a name="line.7318"></a> -<span class="sourceLineNo">7319</span> */<a name="line.7319"></a> -<span class="sourceLineNo">7320</span> private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {<a name="line.7320"></a> -<span class="sourceLineNo">7321</span> Collections.sort(cells, comparator);<a name="line.7321"></a> -<span class="sourceLineNo">7322</span> return cells;<a name="line.7322"></a> -<span class="sourceLineNo">7323</span> }<a name="line.7323"></a> -<span class="sourceLineNo">7324</span><a name="line.7324"></a> -<span class="sourceLineNo">7325</span> //<a name="line.7325"></a> -<span class="sourceLineNo">7326</span> // New HBASE-880 Helpers<a name="line.7326"></a> +<span class="sourceLineNo">7128</span> * Does Get of current value and then adds passed in deltas for this Store returning the result.<a name="line.7128"></a> +<span class="sourceLineNo">7129</span> *<a name="line.7129"></a> +<span class="sourceLineNo">7130</span> * @param op Whether Increment or Append<a name="line.7130"></a> +<span class="sourceLineNo">7131</span> * @param mutation The encompassing Mutation object<a name="line.7131"></a> +<span class="sourceLineNo">7132</span> * @param deltas Changes to apply to this Store; either increment amount or data to append<a name="line.7132"></a> +<span class="sourceLineNo">7133</span> * @param results In here we accumulate all the Cells we are to return to the client; this List<a name="line.7133"></a> +<span class="sourceLineNo">7134</span> * can be larger than what we return in case where delta is zero; i.e. don't write<a name="line.7134"></a> +<span class="sourceLineNo">7135</span> * out new values, just return current value. If null, client doesn't want results returned.<a name="line.7135"></a> +<span class="sourceLineNo">7136</span> * @return Resulting Cells after <code>deltas</code> have been applied to current<a name="line.7136"></a> +<span class="sourceLineNo">7137</span> * values. Side effect is our filling out of the <code>results</code> List.<a name="line.7137"></a> +<span class="sourceLineNo">7138</span> */<a name="line.7138"></a> +<span class="sourceLineNo">7139</span> private List<Cell> reckonDeltasByStore(final Store store, final Operation op,<a name="line.7139"></a> +<span class="sourceLineNo">7140</span> final Mutation mutation, final Durability effectiveDurability, final long now,<a name="line.7140"></a> +<span class="sourceLineNo">7141</span> final List<Cell> deltas, final List<Cell> results)<a name="line.7141"></a> +<span class="sourceLineNo">7142</span> throws IOException {<a name="line.7142"></a> +<span class="sourceLineNo">7143</span> byte [] columnFamily = store.getFamily().getName();<a name="line.7143"></a> +<span class="sourceLineNo">7144</span> List<Cell> toApply = new ArrayList<Cell>(deltas.size());<a name="line.7144"></a> +<span class="sourceLineNo">7145</span> // Get previous values for all columns in this family.<a name="line.7145"></a> +<span class="sourceLineNo">7146</span> List<Cell> currentValues = get(mutation, store, deltas,<a name="line.7146"></a> +<span class="sourceLineNo">7147</span> null/*Default IsolationLevel*/,<a name="line.7147"></a> +<span class="sourceLineNo">7148</span> op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null);<a name="line.7148"></a> +<span class="sourceLineNo">7149</span> // Iterate the input columns and update existing values if they were found, otherwise<a name="line.7149"></a> +<span class="sourceLineNo">7150</span> // add new column initialized to the delta amount<a name="line.7150"></a> +<span class="sourceLineNo">7151</span> int currentValuesIndex = 0;<a name="line.7151"></a> +<span class="sourceLineNo">7152</span> for (int i = 0; i < deltas.size(); i++) {<a name="line.7152"></a> +<span class="sourceLineNo">7153</span> Cell delta = deltas.get(i);<a name="line.7153"></a> +<span class="sourceLineNo">7154</span> Cell currentValue = null;<a name="line.7154"></a> +<span class="sourceLineNo">7155</span> if (currentValuesIndex < currentValues.size() &&<a name="line.7155"></a> +<span class="sourceLineNo">7156</span> CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {<a name="line.7156"></a> +<span class="sourceLineNo">7157</span> currentValue = currentValues.get(currentValuesIndex);<a name="line.7157"></a> +<span class="sourceLineNo">7158</span> if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {<a name="line.7158"></a> +<span class="sourceLineNo">7159</span> currentValuesIndex++;<a name="line.7159"></a> +<span class="sourceLineNo">7160</span> }<a name="line.7160"></a> +<span class="sourceLineNo">7161</span> }<a name="line.7161"></a> +<span class="sourceLineNo">7162</span> // Switch on whether this an increment or an append building the new Cell to apply.<a name="line.7162"></a> +<span class="sourceLineNo">7163</span> Cell newCell = null;<a name="line.7163"></a> +<span class="sourceLineNo">7164</span> MutationType mutationType = null;<a name="line.7164"></a> +<span class="sourceLineNo">7165</span> boolean apply = true;<a name="line.7165"></a> +<span class="sourceLineNo">7166</span> switch (op) {<a name="line.7166"></a> +<span class="sourceLineNo">7167</span> case INCREMENT:<a name="line.7167"></a> +<span class="sourceLineNo">7168</span> mutationType = MutationType.INCREMENT;<a name="line.7168"></a> +<span class="sourceLineNo">7169</span> // If delta amount to apply is 0, don't write WAL or MemStore.<a name="line.7169"></a> +<span class="sourceLineNo">7170</span> long deltaAmount = getLongValue(delta);<a name="line.7170"></a> +<span class="sourceLineNo">7171</span> apply = deltaAmount != 0;<a name="line.7171"></a> +<span class="sourceLineNo">7172</span> newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now,<a name="line.7172"></a> +<span class="sourceLineNo">7173</span> (Increment)mutation);<a name="line.7173"></a> +<span class="sourceLineNo">7174</span> break;<a name="line.7174"></a> +<span class="sourceLineNo">7175</span> case APPEND:<a name="line.7175"></a> +<span class="sourceLineNo">7176</span> mutationType = MutationType.APPEND;<a name="line.7176"></a> +<span class="sourceLineNo">7177</span> // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to.<a name="line.7177"></a> +<span class="sourceLineNo">7178</span> newCell = reckonAppend(delta, currentValue, now, (Append)mutation);<a name="line.7178"></a> +<span class="sourceLineNo">7179</span> break;<a name="line.7179"></a> +<span class="sourceLineNo">7180</span> default: throw new UnsupportedOperationException(op.toString());<a name="line.7180"></a> +<span class="sourceLineNo">7181</span> }<a name="line.7181"></a> +<span class="sourceLineNo">7182</span><a name="line.7182"></a> +<span class="sourceLineNo">7183</span> // Give coprocessors a chance to update the new cell<a name="line.7183"></a> +<span class="sourceLineNo">7184</span> if (coprocessorHost != null) {<a name="line.7184"></a> +<span class="sourceLineNo">7185</span> newCell =<a name="line.7185"></a> +<span class="sourceLineNo">7186</span> coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);<a name="line.7186"></a> +<span class="sourceLineNo">7187</span> }<a name="line.7187"></a> +<span class="sourceLineNo">7188</span> // If apply, we need to update memstore/WAL with new value; add it toApply.<a name="line.7188"></a> +<span class="sourceLineNo">7189</span> if (apply) {<a name="line.7189"></a> +<span class="sourceLineNo">7190</span> toApply.add(newCell);<a name="line.7190"></a> +<span class="sourceLineNo">7191</span> }<a name="line.7191"></a> +<span class="sourceLineNo">7192</span> // Add to results to get returned to the Client. If null, cilent does not want results.<a name="line.7192"></a> +<span class="sourceLineNo">7193</span> if (results != null) {<a name="line.7193"></a> +<span class="sourceLineNo">7194</span> results.add(newCell);<a name="line.7194"></a> +<span class="sourceLineNo">7195</span> }<a name="line.7195"></a> +<span class="sourceLineNo">7196</span> }<a name="line.7196"></a> +<span class="sourceLineNo">7197</span> return toApply;<a name="line.7197"></a> +<span class="sourceLineNo">7198</span> }<a name="line.7198"></a> +<span class="sourceLineNo">7199</span><a name="line.7199"></a> +<span class="sourceLineNo">7200</span> /**<a name="line.7200"></a> +<span class="sourceLineNo">7201</span> * Calculate new Increment Cell.<a name="line.7201"></a> +<span class="sourceLineNo">7202</span> * @return New Increment Cell with delta applied to currentValue if currentValue is not null;<a name="line.7202"></a> +<span class="sourceLineNo">7203</span> * otherwise, a new Cell with the delta set as its value.<a name="line.7203"></a> +<span class="sourceLineNo">7204</span> */<a name="line.7204"></a> +<span class="sourceLineNo">7205</span> private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue,<a name="line.7205"></a> +<span class="sourceLineNo">7206</span> byte [] columnFamily, final long now, Mutation mutation)<a name="line.7206"></a> +<span class="sourceLineNo">7207</span> throws IOException {<a name="line.7207"></a> +<span class="sourceLineNo">7208</span> // Forward any tags found on the delta.<a name="line.7208"></a> +<span class="sourceLineNo">7209</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.7209"></a> +<span class="sourceLineNo">7210</span> long newValue = deltaAmount;<a name="line.7210"></a> +<span class="sourceLineNo">7211</span> long ts = now;<a name="line.7211"></a> +<span class="sourceLineNo">7212</span> if (currentValue != null) {<a name="line.7212"></a> +<span class="sourceLineNo">7213</span> tags = TagUtil.carryForwardTags(tags, currentValue);<a name="line.7213"></a> +<span class="sourceLineNo">7214</span> ts = Math.max(now, currentValue.getTimestamp());<a name="line.7214"></a> +<span class="sourceLineNo">7215</span> newValue += getLongValue(currentValue);<a name="line.7215"></a> +<span class="sourceLineNo">7216</span> }<a name="line.7216"></a> +<span class="sourceLineNo">7217</span> // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made...<a name="line.7217"></a> +<span class="sourceLineNo">7218</span> // doesn't work well with offheaping or if we are doing a different Cell type.<a name="line.7218"></a> +<span class="sourceLineNo">7219</span> byte [] incrementAmountInBytes = Bytes.toBytes(newValue);<a name="line.7219"></a> +<span class="sourceLineNo">7220</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7220"></a> +<span class="sourceLineNo">7221</span> byte [] row = mutation.getRow();<a name="line.7221"></a> +<span class="sourceLineNo">7222</span> return new KeyValue(row, 0, row.length,<a name="line.7222"></a> +<span class="sourceLineNo">7223</span> columnFamily, 0, columnFamily.length,<a name="line.7223"></a> +<span class="sourceLineNo">7224</span> delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(),<a name="line.7224"></a> +<span class="sourceLineNo">7225</span> ts, KeyValue.Type.Put,<a name="line.7225"></a> +<span class="sourceLineNo">7226</span> incrementAmountInBytes, 0, incrementAmountInBytes.length,<a name="line.7226"></a> +<span class="sourceLineNo">7227</span> tags);<a name="line.7227"></a> +<span class="sourceLineNo">7228</span> }<a name="line.7228"></a> +<span class="sourceLineNo">7229</span><a name="line.7229"></a> +<span class="sourceLineNo">7230</span> private Cell reckonAppend(final Cell delta, final Cell currentValue, final long now,<a name="line.7230"></a> +<span class="sourceLineNo">7231</span> Append mutation)<a name="line.7231"></a> +<span class="sourceLineNo">7232</span> throws IOException {<a name="line.7232"></a> +<span class="sourceLineNo">7233</span> // Forward any tags found on the delta.<a name="line.7233"></a> +<span class="sourceLineNo">7234</span> List<Tag> tags = TagUtil.carryForwardTags(delta);<a name="line.7234"></a> +<span class="sourceLineNo">7235</span> long ts = now;<a name="line.7235"></a> +<span class="sourceLineNo">7236</span> Cell newCell = null;<a name="line.7236"></a> +<span class="sourceLineNo">7237</span> byte [] row = mutation.getRow();<a name="line.7237"></a> +<span class="sourceLineNo">7238</span> if (currentValue != null) {<a name="line.7238"></a> +<span class="sourceLineNo">7239</span> tags = TagUtil.carryForwardTags(tags, currentValue);<a name="line.7239"></a> +<span class="sourceLineNo">7240</span> ts = Math.max(now, currentValue.getTimestamp());<a name="line.7240"></a> +<span class="sourceLineNo">7241</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7241"></a> +<span class="sourceLineNo">7242</span> byte[] tagBytes = TagUtil.fromList(tags);<a name="line.7242"></a> +<span class="sourceLineNo">7243</span> // Allocate an empty cell and copy in all parts.<a name="line.7243"></a> +<span class="sourceLineNo">7244</span> // TODO: This is intimate knowledge of how a KeyValue is made. Undo!!! Prevents our doing<a name="line.7244"></a> +<span class="sourceLineNo">7245</span> // other Cell types. Copying on-heap too if an off-heap Cell.<a name="line.7245"></a> +<span class="sourceLineNo">7246</span> newCell = new KeyValue(row.length, delta.getFamilyLength(),<a name="line.7246"></a> +<span class="sourceLineNo">7247</span> delta.getQualifierLength(), ts, KeyValue.Type.Put,<a name="line.7247"></a> +<span class="sourceLineNo">7248</span> delta.getValueLength() + currentValue.getValueLength(),<a name="line.7248"></a> +<span class="sourceLineNo">7249</span> tagBytes == null? 0: tagBytes.length);<a name="line.7249"></a> +<span class="sourceLineNo">7250</span> // Copy in row, family, and qualifier<a name="line.7250"></a> +<span class="sourceLineNo">7251</span> System.arraycopy(row, 0, newCell.getRowArray(), newCell.getRowOffset(), row.length);<a name="line.7251"></a> +<span class="sourceLineNo">7252</span> System.arraycopy(delta.getFamilyArray(), delta.getFamilyOffset(),<a name="line.7252"></a> +<span class="sourceLineNo">7253</span> newCell.getFamilyArray(), newCell.getFamilyOffset(), delta.getFamilyLength());<a name="line.7253"></a> +<span class="sourceLineNo">7254</span> System.arraycopy(delta.getQualifierArray(), delta.getQualifierOffset(),<a name="line.7254"></a> +<span class="sourceLineNo">7255</span> newCell.getQualifierArray(), newCell.getQualifierOffset(), delta.getQualifierLength());<a name="line.7255"></a> +<span class="sourceLineNo">7256</span> // Copy in the value<a name="line.7256"></a> +<span class="sourceLineNo">7257</span> CellUtil.copyValueTo(currentValue, newCell.getValueArray(), newCell.getValueOffset());<a name="line.7257"></a> +<span class="sourceLineNo">7258</span> System.arraycopy(delta.getValueArray(), delta.getValueOffset(),<a name="line.7258"></a> +<span class="sourceLineNo">7259</span> newCell.getValueArray(), newCell.getValueOffset() + currentValue.getValueLength(),<a name="line.7259"></a> +<span class="sourceLineNo">7260</span> delta.getValueLength());<a name="line.7260"></a> +<span class="sourceLineNo">7261</span> // Copy in tag data<a name="line.7261"></a> +<span class="sourceLineNo">7262</span> if (tagBytes != null) {<a name="line.7262"></a> +<span class="sourceLineNo">7263</span> System.arraycopy(tagBytes, 0,<a name="line.7263"></a> +<span class="sourceLineNo">7264</span> newCell.getTagsArray(), newCell.getTagsOffset(), tagBytes.length);<a name="line.7264"></a> +<span class="sourceLineNo">7265</span> }<a name="line.7265"></a> +<span class="sourceLineNo">7266</span> } else {<a name="line.7266"></a> +<span class="sourceLineNo">7267</span> // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP<a name="line.7267"></a> +<span class="sourceLineNo">7268</span> CellUtil.updateLatestStamp(delta, now);<a name="line.7268"></a> +<span class="sourceLineNo">7269</span> newCell = delta;<a name="line.7269"></a> +<span class="sourceLineNo">7270</span> tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());<a name="line.7270"></a> +<span class="sourceLineNo">7271</span> if (tags != null) {<a name="line.7271"></a> +<span class="sourceLineNo">7272</span> newCell = new TagRewriteCell(delta, TagUtil.fromList(tags));<a name="line.7272"></a> +<span class="sourceLineNo">7273</span> }<a name="line.7273"></a> +<span class="sourceLineNo">7274</span> }<a name="line.7274"></a> +<span class="sourceLineNo">7275</span> return newCell;<a name="line.7275"></a> +<span class="sourceLineNo">7276</span> }<a name="line.7276"></a> +<span class="sourceLineNo">7277</span><a name="line.7277"></a> +<span class="sourceLineNo">7278</span> /**<a name="line.7278"></a> +<span class="sourceLineNo">7279</span> * @return Get the long out of the passed in Cell<a name="line.7279"></a> +<span class="sourceLineNo">7280</span> */<a name="line.7280"></a> +<span class="sourceLineNo">7281</span> private static long getLongValue(final Cell cell) throws DoNotRetryIOException {<a name="line.7281"></a> +<span class="sourceLineNo">7282</span> int len = cell.getValueLength();<a name="line.7282"></a> +<span class="sourceLineNo">7283</span> if (len != Bytes.SIZEOF_LONG) {<a name="line.7283"></a> +<span class="sourceLineNo">7284</span> // throw DoNotRetryIOException instead of IllegalArgumentException<a name="line.7284"></a> +<span class="sourceLineNo">7285</span> throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");<a name="line.7285"></a> +<span class="sourceLineNo">7286</span> }<a name="line.7286"></a> +<span class="sourceLineNo">7287</span> return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);<a name="line.7287"></a> +<span class="sourceLineNo">7288</span> }<a name="line.7288"></a> +<span class="sourceLineNo">7289</span><a name="line.7289"></a> +<span class="sourceLineNo">7290</span> /**<a name="line.7290"></a> +<span class="sourceLineNo">7291</span> * Do a specific Get on passed <code>columnFamily</code> and column qualifiers.<a name="line.7291"></a> +<span class="sourceLineNo">7292</span> * @param mutation Mutation we are doing this Get for.<a name="line.7292"></a> +<span class="sourceLineNo">7293</span> * @param columnFamily Which column family on row (TODO: Go all Gets in one go)<a name="line.7293"></a> +<span class="sourceLineNo">7294</span> * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.<a name="line.7294"></a> +<span class="sourceLineNo">7295</span> * @return Return list of Cells found.<a name="line.7295"></a> +<span class="sourceLineNo">7296</span> */<a name="line.7296"></a> +<span class="sourceLineNo">7297</span> private List<Cell> get(final Mutation mutation, final Store store,<a name="line.7297"></a> +<span class="sourceLineNo">7298</span> final List<Cell> coordinates, final IsolationLevel isolation, final TimeRange tr)<a name="line.7298"></a> +<span class="sourceLineNo">7299</span> throws IOException {<a name="line.7299"></a> +<span class="sourceLineNo">7300</span> // Sort the cells so that they match the order that they appear in the Get results. Otherwise,<a name="line.7300"></a> +<span class="sourceLineNo">7301</span> // we won't be able to find the existing values if the cells are not specified in order by the<a name="line.7301"></a> +<span class="sourceLineNo">7302</span> // client since cells are in an array list.<a name="line.7302"></a> +<span class="sourceLineNo">7303</span> // TODO: I don't get why we are sorting. St.Ack 20150107<a name="line.7303"></a> +<span class="sourceLineNo">7304</span> sort(coordinates, store.getComparator());<a name="line.7304"></a> +<span class="sourceLineNo">7305</span> Get get = new Get(mutation.getRow());<a name="line.7305"></a> +<span class="sourceLineNo">7306</span> if (isolation != null) {<a name="line.7306"></a> +<span class="sourceLineNo">7307</span> get.setIsolationLevel(isolation);<a name="line.7307"></a> +<span class="sourceLineNo">7308</span> }<a name="line.7308"></a> +<span class="sourceLineNo">7309</span> for (Cell cell: coordinates) {<a name="line.7309"></a> +<span class="sourceLineNo">7310</span> get.addColumn(store.getFamily().getName(), CellUtil.cloneQualifier(cell));<a name="line.7310"></a> +<span class="sourceLineNo">7311</span> }<a name="line.7311"></a> +<span class="sourceLineNo">7312</span> // Increments carry time range. If an Increment instance, put it on the Get.<a name="line.7312"></a> +<span class="sourceLineNo">7313</span> if (tr != null) {<a name="line.7313"></a> +<span class="sourceLineNo">7314</span> get.setTimeRange(tr.getMin(), tr.getMax());<a name="line.7314"></a> +<span class="sourceLineNo">7315</span> }<a name="line.7315"></a> +<span class="sourceLineNo">7316</span> return get(get, false);<a name="line.7316"></a> +<span class="sourceLineNo">7317</span> }<a name="line.7317"></a> +<span class="sourceLineNo">7318</span><a name="line.7318"></a> +<span class="sourceLineNo">7319</span> /**<a name="line.7319"></a> +<span class="sourceLineNo">7320</span> * @return Sorted list of <code>cells</code> using <code>comparator</code><a name="line.7320"></a> +<span class="sourceLineNo">7321</span> */<a name="line.7321"></a> +<span class="sourceLineNo">7322</span> private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {<a name="line.7322"></a> +<span class="sourceLineNo">7323</span> Collections.sort(cells, comparator);<a name="line.7323"></a> +<span class="sourceLineNo">7324</span> return cells;<a name="line.7324"></a> +<span class="sourceLineNo">7325</span> }<a name="line.7325"></a> +<span class="sourceLineNo">7326</span><a name="line.7326"></a> <span class="sourceLineNo">7327</span> //<a name="line.7327"></a> -<span class="sourceLineNo">7328</span><a name="line.7328"></a> -<span class="sourceLineNo">7329</span> void checkFamily(final byte [] family)<a name="line.7329"></a> -<span class="sourceLineNo">7330</span> throws NoSuchColumnFamilyException {<a name="line.7330"></a> -<span class="sourceLineNo">7331</span> if (!this.htableDescriptor.hasFamily(family)) {<a name="line.7331"></a> -<span class="sourceLineNo">7332</span> throw new NoSuchColumnFamilyException("Column family " +<a name="line.7332"></a> -<span class="sourceLineNo">7333</span> Bytes.toString(family) + " does not exist in region " + this<a name="line.7333"></a> -<span class="sourceLineNo">7334</span> + " in table " + this.htableDescriptor);<a name="line.7334"></a> -<span class="sourceLineNo">7335</span> }<a name="line.7335"></a> -<span class="sourceLineNo">7336</span> }<a name="line.7336"></a> -<span class="sourceLineNo">7337</span><a name="line.7337"></a> -<span class="sourceLineNo">7338</span> public static final long FIXED_OVERHEAD = ClassSize.align(<a name="line.7338"></a> -<span class="sourceLineNo">7339</span> ClassSize.OBJECT +<a name="line.7339"></a> -<span class="sourceLineNo">7340</span> ClassSize.ARRAY +<a name="line.7340"></a> -<span class="sourceLineNo">7341</span> 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +<a name="line.7341"></a> -<span class="sourceLineNo">7342</span> (14 * Bytes.SIZEOF_LONG) +<a name="line.7342"></a> -<span class="sourceLineNo">7343</span> 5 * Bytes.SIZEOF_BOOLEAN);<a name="line.7343"></a> -<span class="sourceLineNo">7344</span><a name="line.7344"></a> -<span class="sourceLineNo">7345</span> // woefully out of date - currently missing:<a name="line.7345"></a> -<span class="sourceLineNo">7346</span> // 1 x HashMap - coprocessorServiceHandlers<a name="line.7346"></a> -<span class="sourceLineNo">7347</span> // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,<a name="line.7347"></a> -<span class="sourceLineNo">7348</span> // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,<a name="line.7348"></a> -<span class="sourceLineNo">7349</span> // writeRequestsCount<a name="line.7349"></a> -<span class="sourceLineNo">7350</span> // 1 x HRegion$WriteState - writestate<a name="line.7350"></a> -<span class="sourceLineNo">7351</span> // 1 x RegionCoprocessorHost - coprocessorHost<a name="line.7351"></a> -<span class="sourceLineNo">7352</span> // 1 x RegionSplitPolicy - splitPolicy<a name="line.7352"></a> -<span class="sourceLineNo">7353</span> // 1 x MetricsRegion - metricsRegion<a name="line.7353"></a> -<span class="sourceLineNo">7354</span> // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper<a name="line.7354"></a> -<span class="sourceLineNo">7355</span> public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +<a name="line.7355"></a> -<span class="sourceLineNo">7356</span> ClassSize.OBJECT + // closeLock<a name="line.7356"></a> -<span class="sourceLineNo">7357</span> (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing<a name="line.7357"></a> -<span class="sourceLineNo">7358</span> (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL<a name="line.7358"></a> -<span class="sourceLineNo">7359</span> (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints<a name="line.7359"></a> -<span class="sourceLineNo">7360</span> WriteState.HEAP_SIZE + // writestate<a name="line.7360"></a> -<span class="sourceLineNo">7361</span> ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores<a name="line.7361"></a> -<span class="sourceLineNo">7362</span> (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock<a name="line.7362"></a> -<span class="sourceLineNo">7363</span> MultiVersionConcurrencyControl.FIXED_SIZE // mvcc<a name="line.7363"></a> -<span class="sourceLineNo">7364</span> + ClassSize.TREEMAP // maxSeqIdInStores<a name="line.7364"></a> -<span class="sourceLineNo">7365</span> + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress<a name="line.7365"></a> -<span class="sourceLineNo">7366</span> ;<a name="line.7366"></a> -<span class="sourceLineNo">7367</span><a name="line.7367"></a> -<span class="sourceLineNo">7368</span> @Override<a name="line.7368"></a> -<span class="sourceLineNo">7369</span> public long heapSize() {<a name="line.7369"></a> -<span class="sourceLineNo">7370</span> long heapSize = DEEP_OVERHEAD;<a name="line.7370"></a> -<span class="sourceLineNo">7371</span> for (Store store : this.stores.values()) {<a name="line.7371"></a> -<span class="sourceLineNo">7372</span> heapSize += store.heapSize();<a name="line.7372"></a> -<span class="sourceLineNo">7373</span> }<a name="line.7373"></a> -<span class="sourceLineNo">7374</span> // this does not take into account row locks, recent flushes, mvcc entries, and more<a name="line.7374"></a> -<span class="sourceLineNo">7375</span> return heapSize;<a name="line.7375"></a> -<span class="sourceLineNo">7376</span> }<a name="line.7376"></a> -<span class="sourceLineNo">7377</span><a name="line.7377"></a> -<span class="sourceLineNo">7378</span> @Override<a name="line.7378"></a> -<span class="sourceLineNo">7379</span> public boolean registerService(Service instance) {<a name="line.7379"></a> -<span class="sourceLineNo">7380</span> /*<a name="line.7380"></a> -<span class="sourceLineNo">7381</span> * No stacking of instances is allowed for a single service name<a name="line.7381"></a> -<span class="sourceLineNo">7382</span> */<a name="line.7382"></a> -<span class="sourceLineNo">7383</span> Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();<a name="line.7383"></a> -<span class="sourceLineNo">7384</span> if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {<a name="line.7384"></a> -<span class="sourceLineNo">7385</span> LOG.error("Coprocessor service " + serviceDesc.getFullName() +<a name="line.7385"></a> -<span class="sourceLineNo">7386</span> " already registered, rejecting request from " + instance<a name="line.7386"></a> -<span class="sourceLineNo">7387</span> );<a name="line.7387"></a> -<span class="sourceLineNo">7388</span> return false;<a name="line.7388"></a> -<span class="sourceLineNo">7389</span> }<a name="line.7389"></a> -<span class="sourceLineNo">7390</span><a name="line.7390"></a> -<span class="sourceLineNo">7391</span> coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);<a name="line.7391"></a> -<span class="sourceLineNo">7392</span> if (LOG.isDebugEnabled()) {<a name="line.7392"></a> -<span class="sourceLineNo">7393</span> LOG.debug("Registered coprocessor service: region=" +<a name="line.7393"></a> -<span class="sourceLineNo">7394</span> Bytes.toStringBinary(getRegionInfo().getRegionName()) +<a name="line.7394"></a> -<span class="sourceLineNo">7395</span> " service=" + serviceDesc.getFullName());<a name="line.7395"></a> -<span class="sourceLineNo">7396</span> }<a name="line.7396"></a> -<span class="sourceLineNo">7397</span> return true;<a name="line.7397"></a> -<span class="sourceLineNo">7398</span> }<a name="line.7398"></a> -<span class="sourceLineNo">7399</span><a name="line.7399"></a> -<span class="sourceLineNo">7400</span> @Override<a name="line.7400"></a> -<span class="sourceLineNo">7401</span> public Message execService(RpcController controller, CoprocessorServiceCall call)<a name="line.7401"></a> -<span class="sourceLineNo">7402</span> throws IOException {<a name="line.7402"></a> -<span class="sourceLineNo">7403</span> String serviceName = call.getServiceName();<a name="line.7403"></a> -<span class="sourceLineNo">7404</span> String methodName = call.getMethodName();<a name="line.7404"></a> -<span class="sourceLineNo">7405</span> if (!coprocessorServiceHandlers.containsKey(serviceName)) {<a name="line.7405"></a> -<span class="sourceLineNo">7406</span> throw new UnknownProtocolException(null,<a name="line.7406"></a> -<span class="sourceLineNo">7407</span> "No registered coprocessor service found for name "+serviceName+<a name="line.7407"></a> -<span class="sourceLineNo">7408</span> " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));<a name="line.7408"></a> -<span class="sourceLineNo">7409</span> }<a name="line.7409"></a> -<span class="sourceLineNo">7410</span><a name="line.7410"></a> -<span class="sourceLineNo">7411</span> Service service = coprocessorServiceHandlers.get(serviceName);<a name="line.7411"></a> -<span class="sourceLineNo">7412</span> Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();<a name="line.7412"></a> -<span class="sourceLineNo">7413</span> Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);<a name="line.7413"></a> -<span class="sourceLineNo">7414</span> if (methodDesc == null) {<a name="line.7414"></a> -<span class="sourceLineNo">7415</span> throw new UnknownProtocolException(service.getClass(),<a name="line.7415"></a> -<span class="sourceLineNo">7416</span> "Unknown method "+methodName+" called on service "+serviceName+<a name="line.7416"></a> -<span class="sourceLineNo">7417</span> " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));<a name="line.7417"></a> -<span class="sourceLineNo">7418</span> }<a name="line.7418"></a> -<span class="sourceLineNo">7419</span><a name="line.7419"></a> -<span class="sourceLineNo">7420</span> Message.Builder builder = service.getRequestPrototype(methodDesc).newBuilderForType();<a name="line.7420"></a> -<span class="sourceLineNo">7421</span> ProtobufUtil.mergeFrom(builder, call.getRequest());<a name="line.7421"></a> -<span class="sourceLineNo">7422</span> Message request = builder.build();<a name="line.7422"></a> -<span class="sourceLineNo">7423</span><a name="line.7423"></a> -<span class="sourceLineNo">7424</span> if (coprocessorHost != null) {<a name="line.7424"></a> -<span class="sourceLineNo">7425</span> request = coprocessorHost.preEndpointInvocation(service, methodName, request);<a name="line.7425"></a> -<span class="sourceLineNo">7426</span> }<a name="line.7426"></a> -<span class="sourceLineNo">7427</span><a name="line.7427"></a> -<span class="sourceLineNo">7428</span> final Message.Builder responseBuilder =<a name="line.7428"></a> -<span class="sourceLineNo">7429</span> service.getResponsePrototype(methodDesc).newBuilderForType();<a name="line.7429"></a> -<span class="sourceLineNo">7430</span> service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {<a name="line.7430"></a> -<span class="sourceLineNo">7431</span> @Override<a name="line.7431"></a> -<span class="sourceLineNo">7432</span> public void run(Message message) {<a name="line.7432"></a> -<span class="sourceLineNo">7433</span> if (message != null) {<a name="line.7433"></a> -<span class="sourceLineNo">7434</span> responseBuilder.mergeFrom(message);<a name="line.7434"></a> -<span class="sourceLineNo">7435</span> }<a name="line.7435"></a> -<span class="sourceLineNo">7436</span> }<a name="line.7436"></a> -<span class="sourceLineNo">7437</span> });<a name="line.7437"></a> -<span class="sourceLineNo">7438</span><a name="line.7438"></a> -<span class="sourceLineNo">7439</span> if (coprocessorHost != null) {<a name="line.7439"></a> -<span class="sourceLineNo">7440</span> coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);<a name="line.7440"></a> -<span class="sourceLineNo">7441</span> }<a name="line.7441"></a> -<span class="sourceLineNo">7442</span><a name="line.7442"></a> -<span class="sourceLineNo">7443</span> IOException exception = ResponseConverter.getControllerException(controller);<a name="line.7443"></a> -<span class="sourceLineNo">7444</span> if (exception != null) {<a name="line.7444"></a> -<span class="sourceLineNo">7445</span> throw exception;<a name="line.7445"></a> -<span class="sourceLineNo">7446</span> }<a name="line.7446"
<TRUNCATED>