http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a8725a46/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html index e2c4389..d03724e 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html @@ -5185,3056 +5185,3058 @@ <span class="sourceLineNo">5177</span> * @param readLock is the lock reader or writer. True indicates that a non-exlcusive<a name="line.5177"></a> <span class="sourceLineNo">5178</span> * lock is requested<a name="line.5178"></a> <span class="sourceLineNo">5179</span> */<a name="line.5179"></a> -<span class="sourceLineNo">5180</span> public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {<a name="line.5180"></a> -<span class="sourceLineNo">5181</span> // Make sure the row is inside of this region before getting the lock for it.<a name="line.5181"></a> -<span class="sourceLineNo">5182</span> checkRow(row, "row lock");<a name="line.5182"></a> -<span class="sourceLineNo">5183</span> // create an object to use a a key in the row lock map<a name="line.5183"></a> -<span class="sourceLineNo">5184</span> HashedBytes rowKey = new HashedBytes(row);<a name="line.5184"></a> -<span class="sourceLineNo">5185</span><a name="line.5185"></a> -<span class="sourceLineNo">5186</span> RowLockContext rowLockContext = null;<a name="line.5186"></a> -<span class="sourceLineNo">5187</span> RowLockImpl result = null;<a name="line.5187"></a> -<span class="sourceLineNo">5188</span> TraceScope traceScope = null;<a name="line.5188"></a> -<span class="sourceLineNo">5189</span><a name="line.5189"></a> -<span class="sourceLineNo">5190</span> // If we're tracing start a span to show how long this took.<a name="line.5190"></a> -<span class="sourceLineNo">5191</span> if (Trace.isTracing()) {<a name="line.5191"></a> -<span class="sourceLineNo">5192</span> traceScope = Trace.startSpan("HRegion.getRowLock");<a name="line.5192"></a> -<span class="sourceLineNo">5193</span> traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));<a name="line.5193"></a> -<span class="sourceLineNo">5194</span> }<a name="line.5194"></a> -<span class="sourceLineNo">5195</span><a name="line.5195"></a> -<span class="sourceLineNo">5196</span> try {<a name="line.5196"></a> -<span class="sourceLineNo">5197</span> // Keep trying until we have a lock or error out.<a name="line.5197"></a> -<span class="sourceLineNo">5198</span> // TODO: do we need to add a time component here?<a name="line.5198"></a> -<span class="sourceLineNo">5199</span> while (result == null) {<a name="line.5199"></a> -<span class="sourceLineNo">5200</span><a name="line.5200"></a> -<span class="sourceLineNo">5201</span> // Try adding a RowLockContext to the lockedRows.<a name="line.5201"></a> -<span class="sourceLineNo">5202</span> // If we can add it then there's no other transactions currently running.<a name="line.5202"></a> -<span class="sourceLineNo">5203</span> rowLockContext = new RowLockContext(rowKey);<a name="line.5203"></a> -<span class="sourceLineNo">5204</span> RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);<a name="line.5204"></a> -<span class="sourceLineNo">5205</span><a name="line.5205"></a> -<span class="sourceLineNo">5206</span> // if there was a running transaction then there's already a context.<a name="line.5206"></a> -<span class="sourceLineNo">5207</span> if (existingContext != null) {<a name="line.5207"></a> -<span class="sourceLineNo">5208</span> rowLockContext = existingContext;<a name="line.5208"></a> -<span class="sourceLineNo">5209</span> }<a name="line.5209"></a> -<span class="sourceLineNo">5210</span><a name="line.5210"></a> -<span class="sourceLineNo">5211</span> // Now try an get the lock.<a name="line.5211"></a> -<span class="sourceLineNo">5212</span> //<a name="line.5212"></a> -<span class="sourceLineNo">5213</span> // This can fail as<a name="line.5213"></a> -<span class="sourceLineNo">5214</span> if (readLock) {<a name="line.5214"></a> -<span class="sourceLineNo">5215</span> result = rowLockContext.newReadLock();<a name="line.5215"></a> -<span class="sourceLineNo">5216</span> } else {<a name="line.5216"></a> -<span class="sourceLineNo">5217</span> result = rowLockContext.newWriteLock();<a name="line.5217"></a> -<span class="sourceLineNo">5218</span> }<a name="line.5218"></a> -<span class="sourceLineNo">5219</span> }<a name="line.5219"></a> -<span class="sourceLineNo">5220</span> if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {<a name="line.5220"></a> -<span class="sourceLineNo">5221</span> if (traceScope != null) {<a name="line.5221"></a> -<span class="sourceLineNo">5222</span> traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");<a name="line.5222"></a> -<span class="sourceLineNo">5223</span> }<a name="line.5223"></a> -<span class="sourceLineNo">5224</span> result = null;<a name="line.5224"></a> -<span class="sourceLineNo">5225</span> // Clean up the counts just in case this was the thing keeping the context alive.<a name="line.5225"></a> -<span class="sourceLineNo">5226</span> rowLockContext.cleanUp();<a name="line.5226"></a> -<span class="sourceLineNo">5227</span> throw new IOException("Timed out waiting for lock for row: " + rowKey);<a name="line.5227"></a> -<span class="sourceLineNo">5228</span> }<a name="line.5228"></a> -<span class="sourceLineNo">5229</span> return result;<a name="line.5229"></a> -<span class="sourceLineNo">5230</span> } catch (InterruptedException ie) {<a name="line.5230"></a> -<span class="sourceLineNo">5231</span> LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);<a name="line.5231"></a> -<span class="sourceLineNo">5232</span> InterruptedIOException iie = new InterruptedIOException();<a name="line.5232"></a> -<span class="sourceLineNo">5233</span> iie.initCause(ie);<a name="line.5233"></a> -<span class="sourceLineNo">5234</span> if (traceScope != null) {<a name="line.5234"></a> -<span class="sourceLineNo">5235</span> traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");<a name="line.5235"></a> -<span class="sourceLineNo">5236</span> }<a name="line.5236"></a> -<span class="sourceLineNo">5237</span> Thread.currentThread().interrupt();<a name="line.5237"></a> -<span class="sourceLineNo">5238</span> throw iie;<a name="line.5238"></a> -<span class="sourceLineNo">5239</span> } finally {<a name="line.5239"></a> -<span class="sourceLineNo">5240</span> if (traceScope != null) {<a name="line.5240"></a> -<span class="sourceLineNo">5241</span> traceScope.close();<a name="line.5241"></a> -<span class="sourceLineNo">5242</span> }<a name="line.5242"></a> -<span class="sourceLineNo">5243</span> }<a name="line.5243"></a> -<span class="sourceLineNo">5244</span> }<a name="line.5244"></a> -<span class="sourceLineNo">5245</span><a name="line.5245"></a> -<span class="sourceLineNo">5246</span> @Override<a name="line.5246"></a> -<span class="sourceLineNo">5247</span> public void releaseRowLocks(List<RowLock> rowLocks) {<a name="line.5247"></a> -<span class="sourceLineNo">5248</span> if (rowLocks != null) {<a name="line.5248"></a> -<span class="sourceLineNo">5249</span> for (RowLock rowLock : rowLocks) {<a name="line.5249"></a> -<span class="sourceLineNo">5250</span> rowLock.release();<a name="line.5250"></a> -<span class="sourceLineNo">5251</span> }<a name="line.5251"></a> -<span class="sourceLineNo">5252</span> rowLocks.clear();<a name="line.5252"></a> -<span class="sourceLineNo">5253</span> }<a name="line.5253"></a> -<span class="sourceLineNo">5254</span> }<a name="line.5254"></a> -<span class="sourceLineNo">5255</span><a name="line.5255"></a> -<span class="sourceLineNo">5256</span> @VisibleForTesting<a name="line.5256"></a> -<span class="sourceLineNo">5257</span> class RowLockContext {<a name="line.5257"></a> -<span class="sourceLineNo">5258</span> private final HashedBytes row;<a name="line.5258"></a> -<span class="sourceLineNo">5259</span> final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);<a name="line.5259"></a> -<span class="sourceLineNo">5260</span> final AtomicBoolean usable = new AtomicBoolean(true);<a name="line.5260"></a> -<span class="sourceLineNo">5261</span> final AtomicInteger count = new AtomicInteger(0);<a name="line.5261"></a> -<span class="sourceLineNo">5262</span> final Object lock = new Object();<a name="line.5262"></a> -<span class="sourceLineNo">5263</span><a name="line.5263"></a> -<span class="sourceLineNo">5264</span> RowLockContext(HashedBytes row) {<a name="line.5264"></a> -<span class="sourceLineNo">5265</span> this.row = row;<a name="line.5265"></a> -<span class="sourceLineNo">5266</span> }<a name="line.5266"></a> -<span class="sourceLineNo">5267</span><a name="line.5267"></a> -<span class="sourceLineNo">5268</span> RowLockImpl newWriteLock() {<a name="line.5268"></a> -<span class="sourceLineNo">5269</span> Lock l = readWriteLock.writeLock();<a name="line.5269"></a> -<span class="sourceLineNo">5270</span> return getRowLock(l);<a name="line.5270"></a> -<span class="sourceLineNo">5271</span> }<a name="line.5271"></a> -<span class="sourceLineNo">5272</span> RowLockImpl newReadLock() {<a name="line.5272"></a> -<span class="sourceLineNo">5273</span> Lock l = readWriteLock.readLock();<a name="line.5273"></a> -<span class="sourceLineNo">5274</span> return getRowLock(l);<a name="line.5274"></a> -<span class="sourceLineNo">5275</span> }<a name="line.5275"></a> -<span class="sourceLineNo">5276</span><a name="line.5276"></a> -<span class="sourceLineNo">5277</span> private RowLockImpl getRowLock(Lock l) {<a name="line.5277"></a> -<span class="sourceLineNo">5278</span> count.incrementAndGet();<a name="line.5278"></a> -<span class="sourceLineNo">5279</span> synchronized (lock) {<a name="line.5279"></a> -<span class="sourceLineNo">5280</span> if (usable.get()) {<a name="line.5280"></a> -<span class="sourceLineNo">5281</span> return new RowLockImpl(this, l);<a name="line.5281"></a> -<span class="sourceLineNo">5282</span> } else {<a name="line.5282"></a> -<span class="sourceLineNo">5283</span> return null;<a name="line.5283"></a> -<span class="sourceLineNo">5284</span> }<a name="line.5284"></a> -<span class="sourceLineNo">5285</span> }<a name="line.5285"></a> -<span class="sourceLineNo">5286</span> }<a name="line.5286"></a> -<span class="sourceLineNo">5287</span><a name="line.5287"></a> -<span class="sourceLineNo">5288</span> void cleanUp() {<a name="line.5288"></a> -<span class="sourceLineNo">5289</span> long c = count.decrementAndGet();<a name="line.5289"></a> -<span class="sourceLineNo">5290</span> if (c <= 0) {<a name="line.5290"></a> -<span class="sourceLineNo">5291</span> synchronized (lock) {<a name="line.5291"></a> -<span class="sourceLineNo">5292</span> if (count.get() <= 0 ){<a name="line.5292"></a> -<span class="sourceLineNo">5293</span> usable.set(false);<a name="line.5293"></a> -<span class="sourceLineNo">5294</span> RowLockContext removed = lockedRows.remove(row);<a name="line.5294"></a> -<span class="sourceLineNo">5295</span> assert removed == this: "we should never remove a different context";<a name="line.5295"></a> -<span class="sourceLineNo">5296</span> }<a name="line.5296"></a> -<span class="sourceLineNo">5297</span> }<a name="line.5297"></a> -<span class="sourceLineNo">5298</span> }<a name="line.5298"></a> -<span class="sourceLineNo">5299</span> }<a name="line.5299"></a> -<span class="sourceLineNo">5300</span><a name="line.5300"></a> -<span class="sourceLineNo">5301</span> @Override<a name="line.5301"></a> -<span class="sourceLineNo">5302</span> public String toString() {<a name="line.5302"></a> -<span class="sourceLineNo">5303</span> return "RowLockContext{" +<a name="line.5303"></a> -<span class="sourceLineNo">5304</span> "row=" + row +<a name="line.5304"></a> -<span class="sourceLineNo">5305</span> ", readWriteLock=" + readWriteLock +<a name="line.5305"></a> -<span class="sourceLineNo">5306</span> ", count=" + count +<a name="line.5306"></a> -<span class="sourceLineNo">5307</span> '}';<a name="line.5307"></a> -<span class="sourceLineNo">5308</span> }<a name="line.5308"></a> -<span class="sourceLineNo">5309</span> }<a name="line.5309"></a> -<span class="sourceLineNo">5310</span><a name="line.5310"></a> -<span class="sourceLineNo">5311</span> /**<a name="line.5311"></a> -<span class="sourceLineNo">5312</span> * Class used to represent a lock on a row.<a name="line.5312"></a> -<span class="sourceLineNo">5313</span> */<a name="line.5313"></a> -<span class="sourceLineNo">5314</span> public static class RowLockImpl implements RowLock {<a name="line.5314"></a> -<span class="sourceLineNo">5315</span> private final RowLockContext context;<a name="line.5315"></a> -<span class="sourceLineNo">5316</span> private final Lock lock;<a name="line.5316"></a> -<span class="sourceLineNo">5317</span><a name="line.5317"></a> -<span class="sourceLineNo">5318</span> public RowLockImpl(RowLockContext context, Lock lock) {<a name="line.5318"></a> -<span class="sourceLineNo">5319</span> this.context = context;<a name="line.5319"></a> -<span class="sourceLineNo">5320</span> this.lock = lock;<a name="line.5320"></a> -<span class="sourceLineNo">5321</span> }<a name="line.5321"></a> -<span class="sourceLineNo">5322</span><a name="line.5322"></a> -<span class="sourceLineNo">5323</span> public Lock getLock() {<a name="line.5323"></a> -<span class="sourceLineNo">5324</span> return lock;<a name="line.5324"></a> -<span class="sourceLineNo">5325</span> }<a name="line.5325"></a> -<span class="sourceLineNo">5326</span><a name="line.5326"></a> -<span class="sourceLineNo">5327</span> @VisibleForTesting<a name="line.5327"></a> -<span class="sourceLineNo">5328</span> public RowLockContext getContext() {<a name="line.5328"></a> -<span class="sourceLineNo">5329</span> return context;<a name="line.5329"></a> -<span class="sourceLineNo">5330</span> }<a name="line.5330"></a> -<span class="sourceLineNo">5331</span><a name="line.5331"></a> -<span class="sourceLineNo">5332</span> @Override<a name="line.5332"></a> -<span class="sourceLineNo">5333</span> public void release() {<a name="line.5333"></a> -<span class="sourceLineNo">5334</span> lock.unlock();<a name="line.5334"></a> -<span class="sourceLineNo">5335</span> context.cleanUp();<a name="line.5335"></a> -<span class="sourceLineNo">5336</span> }<a name="line.5336"></a> -<span class="sourceLineNo">5337</span><a name="line.5337"></a> -<span class="sourceLineNo">5338</span> @Override<a name="line.5338"></a> -<span class="sourceLineNo">5339</span> public String toString() {<a name="line.5339"></a> -<span class="sourceLineNo">5340</span> return "RowLockImpl{" +<a name="line.5340"></a> -<span class="sourceLineNo">5341</span> "context=" + context +<a name="line.5341"></a> -<span class="sourceLineNo">5342</span> ", lock=" + lock +<a name="line.5342"></a> -<span class="sourceLineNo">5343</span> '}';<a name="line.5343"></a> -<span class="sourceLineNo">5344</span> }<a name="line.5344"></a> -<span class="sourceLineNo">5345</span> }<a name="line.5345"></a> -<span class="sourceLineNo">5346</span><a name="line.5346"></a> -<span class="sourceLineNo">5347</span> /**<a name="line.5347"></a> -<span class="sourceLineNo">5348</span> * Determines whether multiple column families are present<a name="line.5348"></a> -<span class="sourceLineNo">5349</span> * Precondition: familyPaths is not null<a name="line.5349"></a> -<span class="sourceLineNo">5350</span> *<a name="line.5350"></a> -<span class="sourceLineNo">5351</span> * @param familyPaths List of (column family, hfilePath)<a name="line.5351"></a> -<span class="sourceLineNo">5352</span> */<a name="line.5352"></a> -<span class="sourceLineNo">5353</span> private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {<a name="line.5353"></a> -<span class="sourceLineNo">5354</span> boolean multipleFamilies = false;<a name="line.5354"></a> -<span class="sourceLineNo">5355</span> byte[] family = null;<a name="line.5355"></a> -<span class="sourceLineNo">5356</span> for (Pair<byte[], String> pair : familyPaths) {<a name="line.5356"></a> -<span class="sourceLineNo">5357</span> byte[] fam = pair.getFirst();<a name="line.5357"></a> -<span class="sourceLineNo">5358</span> if (family == null) {<a name="line.5358"></a> -<span class="sourceLineNo">5359</span> family = fam;<a name="line.5359"></a> -<span class="sourceLineNo">5360</span> } else if (!Bytes.equals(family, fam)) {<a name="line.5360"></a> -<span class="sourceLineNo">5361</span> multipleFamilies = true;<a name="line.5361"></a> -<span class="sourceLineNo">5362</span> break;<a name="line.5362"></a> -<span class="sourceLineNo">5363</span> }<a name="line.5363"></a> -<span class="sourceLineNo">5364</span> }<a name="line.5364"></a> -<span class="sourceLineNo">5365</span> return multipleFamilies;<a name="line.5365"></a> -<span class="sourceLineNo">5366</span> }<a name="line.5366"></a> -<span class="sourceLineNo">5367</span><a name="line.5367"></a> -<span class="sourceLineNo">5368</span> @Override<a name="line.5368"></a> -<span class="sourceLineNo">5369</span> public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,<a name="line.5369"></a> -<span class="sourceLineNo">5370</span> BulkLoadListener bulkLoadListener) throws IOException {<a name="line.5370"></a> -<span class="sourceLineNo">5371</span> long seqId = -1;<a name="line.5371"></a> -<span class="sourceLineNo">5372</span> Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);<a name="line.5372"></a> -<span class="sourceLineNo">5373</span> Preconditions.checkNotNull(familyPaths);<a name="line.5373"></a> -<span class="sourceLineNo">5374</span> // we need writeLock for multi-family bulk load<a name="line.5374"></a> -<span class="sourceLineNo">5375</span> startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));<a name="line.5375"></a> -<span class="sourceLineNo">5376</span> try {<a name="line.5376"></a> -<span class="sourceLineNo">5377</span> this.writeRequestsCount.increment();<a name="line.5377"></a> -<span class="sourceLineNo">5378</span><a name="line.5378"></a> -<span class="sourceLineNo">5379</span> // There possibly was a split that happened between when the split keys<a name="line.5379"></a> -<span class="sourceLineNo">5380</span> // were gathered and before the HRegion's write lock was taken. We need<a name="line.5380"></a> -<span class="sourceLineNo">5381</span> // to validate the HFile region before attempting to bulk load all of them<a name="line.5381"></a> -<span class="sourceLineNo">5382</span> List<IOException> ioes = new ArrayList<IOException>();<a name="line.5382"></a> -<span class="sourceLineNo">5383</span> List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();<a name="line.5383"></a> -<span class="sourceLineNo">5384</span> for (Pair<byte[], String> p : familyPaths) {<a name="line.5384"></a> -<span class="sourceLineNo">5385</span> byte[] familyName = p.getFirst();<a name="line.5385"></a> -<span class="sourceLineNo">5386</span> String path = p.getSecond();<a name="line.5386"></a> -<span class="sourceLineNo">5387</span><a name="line.5387"></a> -<span class="sourceLineNo">5388</span> Store store = getStore(familyName);<a name="line.5388"></a> -<span class="sourceLineNo">5389</span> if (store == null) {<a name="line.5389"></a> -<span class="sourceLineNo">5390</span> IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(<a name="line.5390"></a> -<span class="sourceLineNo">5391</span> "No such column family " + Bytes.toStringBinary(familyName));<a name="line.5391"></a> -<span class="sourceLineNo">5392</span> ioes.add(ioe);<a name="line.5392"></a> -<span class="sourceLineNo">5393</span> } else {<a name="line.5393"></a> -<span class="sourceLineNo">5394</span> try {<a name="line.5394"></a> -<span class="sourceLineNo">5395</span> store.assertBulkLoadHFileOk(new Path(path));<a name="line.5395"></a> -<span class="sourceLineNo">5396</span> } catch (WrongRegionException wre) {<a name="line.5396"></a> -<span class="sourceLineNo">5397</span> // recoverable (file doesn't fit in region)<a name="line.5397"></a> -<span class="sourceLineNo">5398</span> failures.add(p);<a name="line.5398"></a> -<span class="sourceLineNo">5399</span> } catch (IOException ioe) {<a name="line.5399"></a> -<span class="sourceLineNo">5400</span> // unrecoverable (hdfs problem)<a name="line.5400"></a> -<span class="sourceLineNo">5401</span> ioes.add(ioe);<a name="line.5401"></a> -<span class="sourceLineNo">5402</span> }<a name="line.5402"></a> -<span class="sourceLineNo">5403</span> }<a name="line.5403"></a> -<span class="sourceLineNo">5404</span> }<a name="line.5404"></a> -<span class="sourceLineNo">5405</span><a name="line.5405"></a> -<span class="sourceLineNo">5406</span> // validation failed because of some sort of IO problem.<a name="line.5406"></a> -<span class="sourceLineNo">5407</span> if (ioes.size() != 0) {<a name="line.5407"></a> -<span class="sourceLineNo">5408</span> IOException e = MultipleIOException.createIOException(ioes);<a name="line.5408"></a> -<span class="sourceLineNo">5409</span> LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);<a name="line.5409"></a> -<span class="sourceLineNo">5410</span> throw e;<a name="line.5410"></a> -<span class="sourceLineNo">5411</span> }<a name="line.5411"></a> -<span class="sourceLineNo">5412</span><a name="line.5412"></a> -<span class="sourceLineNo">5413</span> // validation failed, bail out before doing anything permanent.<a name="line.5413"></a> -<span class="sourceLineNo">5414</span> if (failures.size() != 0) {<a name="line.5414"></a> -<span class="sourceLineNo">5415</span> StringBuilder list = new StringBuilder();<a name="line.5415"></a> -<span class="sourceLineNo">5416</span> for (Pair<byte[], String> p : failures) {<a name="line.5416"></a> -<span class="sourceLineNo">5417</span> list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")<a name="line.5417"></a> -<span class="sourceLineNo">5418</span> .append(p.getSecond());<a name="line.5418"></a> -<span class="sourceLineNo">5419</span> }<a name="line.5419"></a> -<span class="sourceLineNo">5420</span> // problem when validating<a name="line.5420"></a> -<span class="sourceLineNo">5421</span> LOG.warn("There was a recoverable bulk load failure likely due to a" +<a name="line.5421"></a> -<span class="sourceLineNo">5422</span> " split. These (family, HFile) pairs were not loaded: " + list);<a name="line.5422"></a> -<span class="sourceLineNo">5423</span> return false;<a name="line.5423"></a> -<span class="sourceLineNo">5424</span> }<a name="line.5424"></a> -<span class="sourceLineNo">5425</span><a name="line.5425"></a> -<span class="sourceLineNo">5426</span> // We need to assign a sequential ID that's in between two memstores in order to preserve<a name="line.5426"></a> -<span class="sourceLineNo">5427</span> // the guarantee that all the edits lower than the highest sequential ID from all the<a name="line.5427"></a> -<span class="sourceLineNo">5428</span> // HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is<a name="line.5428"></a> -<span class="sourceLineNo">5429</span> // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is<a name="line.5429"></a> -<span class="sourceLineNo">5430</span> // a sequence id that we can be sure is beyond the last hfile written).<a name="line.5430"></a> -<span class="sourceLineNo">5431</span> if (assignSeqId) {<a name="line.5431"></a> -<span class="sourceLineNo">5432</span> FlushResult fs = flushcache(true, false);<a name="line.5432"></a> -<span class="sourceLineNo">5433</span> if (fs.isFlushSucceeded()) {<a name="line.5433"></a> -<span class="sourceLineNo">5434</span> seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5434"></a> -<span class="sourceLineNo">5435</span> } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {<a name="line.5435"></a> -<span class="sourceLineNo">5436</span> seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5436"></a> -<span class="sourceLineNo">5437</span> } else {<a name="line.5437"></a> -<span class="sourceLineNo">5438</span> throw new IOException("Could not bulk load with an assigned sequential ID because the "+<a name="line.5438"></a> -<span class="sourceLineNo">5439</span> "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);<a name="line.5439"></a> -<span class="sourceLineNo">5440</span> }<a name="line.5440"></a> -<span class="sourceLineNo">5441</span> }<a name="line.5441"></a> -<span class="sourceLineNo">5442</span><a name="line.5442"></a> -<span class="sourceLineNo">5443</span> for (Pair<byte[], String> p : familyPaths) {<a name="line.5443"></a> -<span class="sourceLineNo">5444</span> byte[] familyName = p.getFirst();<a name="line.5444"></a> -<span class="sourceLineNo">5445</span> String path = p.getSecond();<a name="line.5445"></a> -<span class="sourceLineNo">5446</span> Store store = getStore(familyName);<a name="line.5446"></a> -<span class="sourceLineNo">5447</span> try {<a name="line.5447"></a> -<span class="sourceLineNo">5448</span> String finalPath = path;<a name="line.5448"></a> -<span class="sourceLineNo">5449</span> if (bulkLoadListener != null) {<a name="line.5449"></a> -<span class="sourceLineNo">5450</span> finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);<a name="line.5450"></a> -<span class="sourceLineNo">5451</span> }<a name="line.5451"></a> -<span class="sourceLineNo">5452</span> Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);<a name="line.5452"></a> -<span class="sourceLineNo">5453</span><a name="line.5453"></a> -<span class="sourceLineNo">5454</span> if(storeFiles.containsKey(familyName)) {<a name="line.5454"></a> -<span class="sourceLineNo">5455</span> storeFiles.get(familyName).add(commitedStoreFile);<a name="line.5455"></a> -<span class="sourceLineNo">5456</span> } else {<a name="line.5456"></a> -<span class="sourceLineNo">5457</span> List<Path> storeFileNames = new ArrayList<Path>();<a name="line.5457"></a> -<span class="sourceLineNo">5458</span> storeFileNames.add(commitedStoreFile);<a name="line.5458"></a> -<span class="sourceLineNo">5459</span> storeFiles.put(familyName, storeFileNames);<a name="line.5459"></a> -<span class="sourceLineNo">5460</span> }<a name="line.5460"></a> -<span class="sourceLineNo">5461</span> if (bulkLoadListener != null) {<a name="line.5461"></a> -<span class="sourceLineNo">5462</span> bulkLoadListener.doneBulkLoad(familyName, path);<a name="line.5462"></a> -<span class="sourceLineNo">5463</span> }<a name="line.5463"></a> -<span class="sourceLineNo">5464</span> } catch (IOException ioe) {<a name="line.5464"></a> -<span class="sourceLineNo">5465</span> // A failure here can cause an atomicity violation that we currently<a name="line.5465"></a> -<span class="sourceLineNo">5466</span> // cannot recover from since it is likely a failed HDFS operation.<a name="line.5466"></a> -<span class="sourceLineNo">5467</span><a name="line.5467"></a> -<span class="sourceLineNo">5468</span> // TODO Need a better story for reverting partial failures due to HDFS.<a name="line.5468"></a> -<span class="sourceLineNo">5469</span> LOG.error("There was a partial failure due to IO when attempting to" +<a name="line.5469"></a> -<span class="sourceLineNo">5470</span> " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);<a name="line.5470"></a> -<span class="sourceLineNo">5471</span> if (bulkLoadListener != null) {<a name="line.5471"></a> -<span class="sourceLineNo">5472</span> try {<a name="line.5472"></a> -<span class="sourceLineNo">5473</span> bulkLoadListener.failedBulkLoad(familyName, path);<a name="line.5473"></a> -<span class="sourceLineNo">5474</span> } catch (Exception ex) {<a name="line.5474"></a> -<span class="sourceLineNo">5475</span> LOG.error("Error while calling failedBulkLoad for family " +<a name="line.5475"></a> -<span class="sourceLineNo">5476</span> Bytes.toString(familyName) + " with path " + path, ex);<a name="line.5476"></a> -<span class="sourceLineNo">5477</span> }<a name="line.5477"></a> -<span class="sourceLineNo">5478</span> }<a name="line.5478"></a> -<span class="sourceLineNo">5479</span> throw ioe;<a name="line.5479"></a> -<span class="sourceLineNo">5480</span> }<a name="line.5480"></a> -<span class="sourceLineNo">5481</span> }<a name="line.5481"></a> -<span class="sourceLineNo">5482</span><a name="line.5482"></a> -<span class="sourceLineNo">5483</span> return true;<a name="line.5483"></a> -<span class="sourceLineNo">5484</span> } finally {<a name="line.5484"></a> -<span class="sourceLineNo">5485</span> if (wal != null && !storeFiles.isEmpty()) {<a name="line.5485"></a> -<span class="sourceLineNo">5486</span> // write a bulk load event when not all hfiles are loaded<a name="line.5486"></a> -<span class="sourceLineNo">5487</span> try {<a name="line.5487"></a> -<span class="sourceLineNo">5488</span> WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(<a name="line.5488"></a> -<span class="sourceLineNo">5489</span> this.getRegionInfo().getTable(),<a name="line.5489"></a> -<span class="sourceLineNo">5490</span> ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);<a name="line.5490"></a> -<span class="sourceLineNo">5491</span> WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),<a name="line.5491"></a> -<span class="sourceLineNo">5492</span> loadDescriptor, mvcc);<a name="line.5492"></a> -<span class="sourceLineNo">5493</span> } catch (IOException ioe) {<a name="line.5493"></a> -<span class="sourceLineNo">5494</span> if (this.rsServices != null) {<a name="line.5494"></a> -<span class="sourceLineNo">5495</span> // Have to abort region server because some hfiles has been loaded but we can't write<a name="line.5495"></a> -<span class="sourceLineNo">5496</span> // the event into WAL<a name="line.5496"></a> -<span class="sourceLineNo">5497</span> this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);<a name="line.5497"></a> -<span class="sourceLineNo">5498</span> }<a name="line.5498"></a> -<span class="sourceLineNo">5499</span> }<a name="line.5499"></a> -<span class="sourceLineNo">5500</span> }<a name="line.5500"></a> -<span class="sourceLineNo">5501</span><a name="line.5501"></a> -<span class="sourceLineNo">5502</span> closeBulkRegionOperation();<a name="line.5502"></a> -<span class="sourceLineNo">5503</span> }<a name="line.5503"></a> -<span class="sourceLineNo">5504</span> }<a name="line.5504"></a> -<span class="sourceLineNo">5505</span><a name="line.5505"></a> -<span class="sourceLineNo">5506</span> @Override<a name="line.5506"></a> -<span class="sourceLineNo">5507</span> public boolean equals(Object o) {<a name="line.5507"></a> -<span class="sourceLineNo">5508</span> return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),<a name="line.5508"></a> -<span class="sourceLineNo">5509</span> ((HRegion) o).getRegionInfo().getRegionName());<a name="line.5509"></a> -<span class="sourceLineNo">5510</span> }<a name="line.5510"></a> -<span class="sourceLineNo">5511</span><a name="line.5511"></a> -<span class="sourceLineNo">5512</span> @Override<a name="line.5512"></a> -<span class="sourceLineNo">5513</span> public int hashCode() {<a name="line.5513"></a> -<span class="sourceLineNo">5514</span> return Bytes.hashCode(getRegionInfo().getRegionName());<a name="line.5514"></a> -<span class="sourceLineNo">5515</span> }<a name="line.5515"></a> -<span class="sourceLineNo">5516</span><a name="line.5516"></a> -<span class="sourceLineNo">5517</span> @Override<a name="line.5517"></a> -<span class="sourceLineNo">5518</span> public String toString() {<a name="line.5518"></a> -<span class="sourceLineNo">5519</span> return getRegionInfo().getRegionNameAsString();<a name="line.5519"></a> -<span class="sourceLineNo">5520</span> }<a name="line.5520"></a> -<span class="sourceLineNo">5521</span><a name="line.5521"></a> -<span class="sourceLineNo">5522</span> /**<a name="line.5522"></a> -<span class="sourceLineNo">5523</span> * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).<a name="line.5523"></a> -<span class="sourceLineNo">5524</span> */<a name="line.5524"></a> -<span class="sourceLineNo">5525</span> class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {<a name="line.5525"></a> -<span class="sourceLineNo">5526</span> // Package local for testability<a name="line.5526"></a> -<span class="sourceLineNo">5527</span> KeyValueHeap storeHeap = null;<a name="line.5527"></a> -<span class="sourceLineNo">5528</span> /** Heap of key-values that are not essential for the provided filters and are thus read<a name="line.5528"></a> -<span class="sourceLineNo">5529</span> * on demand, if on-demand column family loading is enabled.*/<a name="line.5529"></a> -<span class="sourceLineNo">5530</span> KeyValueHeap joinedHeap = null;<a name="line.5530"></a> -<span class="sourceLineNo">5531</span> /**<a name="line.5531"></a> -<span class="sourceLineNo">5532</span> * If the joined heap data gathering is interrupted due to scan limits, this will<a name="line.5532"></a> -<span class="sourceLineNo">5533</span> * contain the row for which we are populating the values.*/<a name="line.5533"></a> -<span class="sourceLineNo">5534</span> protected Cell joinedContinuationRow = null;<a name="line.5534"></a> -<span class="sourceLineNo">5535</span> private boolean filterClosed = false;<a name="line.5535"></a> -<span class="sourceLineNo">5536</span><a name="line.5536"></a> -<span class="sourceLineNo">5537</span> protected final int isScan;<a name="line.5537"></a> -<span class="sourceLineNo">5538</span> protected final byte[] stopRow;<a name="line.5538"></a> -<span class="sourceLineNo">5539</span> protected final HRegion region;<a name="line.5539"></a> -<span class="sourceLineNo">5540</span> protected final CellComparator comparator;<a name="line.5540"></a> -<span class="sourceLineNo">5541</span> protected boolean copyCellsFromSharedMem = false;<a name="line.5541"></a> -<span class="sourceLineNo">5542</span><a name="line.5542"></a> -<span class="sourceLineNo">5543</span> private final long readPt;<a name="line.5543"></a> -<span class="sourceLineNo">5544</span> private final long maxResultSize;<a name="line.5544"></a> -<span class="sourceLineNo">5545</span> private final ScannerContext defaultScannerContext;<a name="line.5545"></a> -<span class="sourceLineNo">5546</span> private final FilterWrapper filter;<a name="line.5546"></a> -<span class="sourceLineNo">5547</span><a name="line.5547"></a> -<span class="sourceLineNo">5548</span> @Override<a name="line.5548"></a> -<span class="sourceLineNo">5549</span> public HRegionInfo getRegionInfo() {<a name="line.5549"></a> -<span class="sourceLineNo">5550</span> return region.getRegionInfo();<a name="line.5550"></a> -<span class="sourceLineNo">5551</span> }<a name="line.5551"></a> -<span class="sourceLineNo">5552</span><a name="line.5552"></a> -<span class="sourceLineNo">5553</span> public void setCopyCellsFromSharedMem(boolean copyCells) {<a name="line.5553"></a> -<span class="sourceLineNo">5554</span> this.copyCellsFromSharedMem = copyCells;<a name="line.5554"></a> -<span class="sourceLineNo">5555</span> }<a name="line.5555"></a> -<span class="sourceLineNo">5556</span><a name="line.5556"></a> -<span class="sourceLineNo">5557</span> RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,<a name="line.5557"></a> -<span class="sourceLineNo">5558</span> boolean copyCellsFromSharedMem)<a name="line.5558"></a> -<span class="sourceLineNo">5559</span> throws IOException {<a name="line.5559"></a> -<span class="sourceLineNo">5560</span> this.region = region;<a name="line.5560"></a> -<span class="sourceLineNo">5561</span> this.maxResultSize = scan.getMaxResultSize();<a name="line.5561"></a> -<span class="sourceLineNo">5562</span> if (scan.hasFilter()) {<a name="line.5562"></a> -<span class="sourceLineNo">5563</span> this.filter = new FilterWrapper(scan.getFilter());<a name="line.5563"></a> -<span class="sourceLineNo">5564</span> } else {<a name="line.5564"></a> -<span class="sourceLineNo">5565</span> this.filter = null;<a name="line.5565"></a> -<span class="sourceLineNo">5566</span> }<a name="line.5566"></a> -<span class="sourceLineNo">5567</span> this.comparator = region.getCellCompartor();<a name="line.5567"></a> -<span class="sourceLineNo">5568</span> /**<a name="line.5568"></a> -<span class="sourceLineNo">5569</span> * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default<a name="line.5569"></a> -<span class="sourceLineNo">5570</span> * scanner context that can be used to enforce the batch limit in the event that a<a name="line.5570"></a> -<span class="sourceLineNo">5571</span> * ScannerContext is not specified during an invocation of next/nextRaw<a name="line.5571"></a> -<span class="sourceLineNo">5572</span> */<a name="line.5572"></a> -<span class="sourceLineNo">5573</span> defaultScannerContext = ScannerContext.newBuilder()<a name="line.5573"></a> -<span class="sourceLineNo">5574</span> .setBatchLimit(scan.getBatch()).build();<a name="line.5574"></a> -<span class="sourceLineNo">5575</span><a name="line.5575"></a> -<span class="sourceLineNo">5576</span> if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {<a name="line.5576"></a> -<span class="sourceLineNo">5577</span> this.stopRow = null;<a name="line.5577"></a> -<span class="sourceLineNo">5578</span> } else {<a name="line.5578"></a> -<span class="sourceLineNo">5579</span> this.stopRow = scan.getStopRow();<a name="line.5579"></a> -<span class="sourceLineNo">5580</span> }<a name="line.5580"></a> -<span class="sourceLineNo">5581</span> // If we are doing a get, we want to be [startRow,endRow]. Normally<a name="line.5581"></a> -<span class="sourceLineNo">5582</span> // it is [startRow,endRow) and if startRow=endRow we get nothing.<a name="line.5582"></a> -<span class="sourceLineNo">5583</span> this.isScan = scan.isGetScan() ? 1 : 0;<a name="line.5583"></a> -<span class="sourceLineNo">5584</span><a name="line.5584"></a> -<span class="sourceLineNo">5585</span> // synchronize on scannerReadPoints so that nobody calculates<a name="line.5585"></a> -<span class="sourceLineNo">5586</span> // getSmallestReadPoint, before scannerReadPoints is updated.<a name="line.5586"></a> -<span class="sourceLineNo">5587</span> IsolationLevel isolationLevel = scan.getIsolationLevel();<a name="line.5587"></a> -<span class="sourceLineNo">5588</span> synchronized(scannerReadPoints) {<a name="line.5588"></a> -<span class="sourceLineNo">5589</span> this.readPt = getReadpoint(isolationLevel);<a name="line.5589"></a> -<span class="sourceLineNo">5590</span> scannerReadPoints.put(this, this.readPt);<a name="line.5590"></a> -<span class="sourceLineNo">5591</span> }<a name="line.5591"></a> -<span class="sourceLineNo">5592</span><a name="line.5592"></a> -<span class="sourceLineNo">5593</span> // Here we separate all scanners into two lists - scanner that provide data required<a name="line.5593"></a> -<span class="sourceLineNo">5594</span> // by the filter to operate (scanners list) and all others (joinedScanners list).<a name="line.5594"></a> -<span class="sourceLineNo">5595</span> List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();<a name="line.5595"></a> -<span class="sourceLineNo">5596</span> List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();<a name="line.5596"></a> -<span class="sourceLineNo">5597</span> if (additionalScanners != null) {<a name="line.5597"></a> -<span class="sourceLineNo">5598</span> scanners.addAll(additionalScanners);<a name="line.5598"></a> -<span class="sourceLineNo">5599</span> }<a name="line.5599"></a> -<span class="sourceLineNo">5600</span><a name="line.5600"></a> -<span class="sourceLineNo">5601</span> for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {<a name="line.5601"></a> -<span class="sourceLineNo">5602</span> Store store = stores.get(entry.getKey());<a name="line.5602"></a> -<span class="sourceLineNo">5603</span> KeyValueScanner scanner;<a name="line.5603"></a> -<span class="sourceLineNo">5604</span> try {<a name="line.5604"></a> -<span class="sourceLineNo">5605</span> scanner = store.getScanner(scan, entry.getValue(), this.readPt);<a name="line.5605"></a> -<span class="sourceLineNo">5606</span> } catch (FileNotFoundException e) {<a name="line.5606"></a> -<span class="sourceLineNo">5607</span> throw handleFileNotFound(e);<a name="line.5607"></a> -<span class="sourceLineNo">5608</span> }<a name="line.5608"></a> -<span class="sourceLineNo">5609</span> if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()<a name="line.5609"></a> -<span class="sourceLineNo">5610</span> || this.filter.isFamilyEssential(entry.getKey())) {<a name="line.5610"></a> -<span class="sourceLineNo">5611</span> scanners.add(scanner);<a name="line.5611"></a> -<span class="sourceLineNo">5612</span> } else {<a name="line.5612"></a> -<span class="sourceLineNo">5613</span> joinedScanners.add(scanner);<a name="line.5613"></a> -<span class="sourceLineNo">5614</span> }<a name="line.5614"></a> -<span class="sourceLineNo">5615</span> }<a name="line.5615"></a> -<span class="sourceLineNo">5616</span> this.copyCellsFromSharedMem = copyCellsFromSharedMem;<a name="line.5616"></a> -<span class="sourceLineNo">5617</span> initializeKVHeap(scanners, joinedScanners, region);<a name="line.5617"></a> -<span class="sourceLineNo">5618</span> }<a name="line.5618"></a> -<span class="sourceLineNo">5619</span><a name="line.5619"></a> -<span class="sourceLineNo">5620</span> protected void initializeKVHeap(List<KeyValueScanner> scanners,<a name="line.5620"></a> -<span class="sourceLineNo">5621</span> List<KeyValueScanner> joinedScanners, HRegion region)<a name="line.5621"></a> -<span class="sourceLineNo">5622</span> throws IOException {<a name="line.5622"></a> -<span class="sourceLineNo">5623</span> this.storeHeap = new KeyValueHeap(scanners, comparator);<a name="line.5623"></a> -<span class="sourceLineNo">5624</span> if (!joinedScanners.isEmpty()) {<a name="line.5624"></a> -<span class="sourceLineNo">5625</span> this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);<a name="line.5625"></a> -<span class="sourceLineNo">5626</span> }<a name="line.5626"></a> -<span class="sourceLineNo">5627</span> }<a name="line.5627"></a> -<span class="sourceLineNo">5628</span><a name="line.5628"></a> -<span class="sourceLineNo">5629</span> @Override<a name="line.5629"></a> -<span class="sourceLineNo">5630</span> public long getMaxResultSize() {<a name="line.5630"></a> -<span class="sourceLineNo">5631</span> return maxResultSize;<a name="line.5631"></a> -<span class="sourceLineNo">5632</span> }<a name="line.5632"></a> -<span class="sourceLineNo">5633</span><a name="line.5633"></a> -<span class="sourceLineNo">5634</span> @Override<a name="line.5634"></a> -<span class="sourceLineNo">5635</span> public long getMvccReadPoint() {<a name="line.5635"></a> -<span class="sourceLineNo">5636</span> return this.readPt;<a name="line.5636"></a> -<span class="sourceLineNo">5637</span> }<a name="line.5637"></a> -<span class="sourceLineNo">5638</span><a name="line.5638"></a> -<span class="sourceLineNo">5639</span> @Override<a name="line.5639"></a> -<span class="sourceLineNo">5640</span> public int getBatch() {<a name="line.5640"></a> -<span class="sourceLineNo">5641</span> return this.defaultScannerContext.getBatchLimit();<a name="line.5641"></a> -<span class="sourceLineNo">5642</span> }<a name="line.5642"></a> -<span class="sourceLineNo">5643</span><a name="line.5643"></a> -<span class="sourceLineNo">5644</span> /**<a name="line.5644"></a> -<span class="sourceLineNo">5645</span> * Reset both the filter and the old filter.<a name="line.5645"></a> -<span class="sourceLineNo">5646</span> *<a name="line.5646"></a> -<span class="sourceLineNo">5647</span> * @throws IOException in case a filter raises an I/O exception.<a name="line.5647"></a> -<span class="sourceLineNo">5648</span> */<a name="line.5648"></a> -<span class="sourceLineNo">5649</span> protected void resetFilters() throws IOException {<a name="line.5649"></a> -<span class="sourceLineNo">5650</span> if (filter != null) {<a name="line.5650"></a> -<span class="sourceLineNo">5651</span> filter.reset();<a name="line.5651"></a> -<span class="sourceLineNo">5652</span> }<a name="line.5652"></a> -<span class="sourceLineNo">5653</span> }<a name="line.5653"></a> -<span class="sourceLineNo">5654</span><a name="line.5654"></a> -<span class="sourceLineNo">5655</span> @Override<a name="line.5655"></a> -<span class="sourceLineNo">5656</span> public boolean next(List<Cell> outResults)<a name="line.5656"></a> -<span class="sourceLineNo">5657</span> throws IOException {<a name="line.5657"></a> -<span class="sourceLineNo">5658</span> // apply the batching limit by default<a name="line.5658"></a> -<span class="sourceLineNo">5659</span> return next(outResults, defaultScannerContext);<a name="line.5659"></a> -<span class="sourceLineNo">5660</span> }<a name="line.5660"></a> -<span class="sourceLineNo">5661</span><a name="line.5661"></a> -<span class="sourceLineNo">5662</span> @Override<a name="line.5662"></a> -<span class="sourceLineNo">5663</span> public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)<a name="line.5663"></a> -<span class="sourceLineNo">5664</span> throws IOException {<a name="line.5664"></a> -<span class="sourceLineNo">5665</span> if (this.filterClosed) {<a name="line.5665"></a> -<span class="sourceLineNo">5666</span> throw new UnknownScannerException("Scanner was closed (timed out?) " +<a name="line.5666"></a> -<span class="sourceLineNo">5667</span> "after we renewed it. Could be caused by a very slow scanner " +<a name="line.5667"></a> -<span class="sourceLineNo">5668</span> "or a lengthy garbage collection");<a name="line.5668"></a> -<span class="sourceLineNo">5669</span> }<a name="line.5669"></a> -<span class="sourceLineNo">5670</span> startRegionOperation(Operation.SCAN);<a name="line.5670"></a> -<span class="sourceLineNo">5671</span> readRequestsCount.increment();<a name="line.5671"></a> -<span class="sourceLineNo">5672</span> try {<a name="line.5672"></a> -<span class="sourceLineNo">5673</span> return nextRaw(outResults, scannerContext);<a name="line.5673"></a> -<span class="sourceLineNo">5674</span> } finally {<a name="line.5674"></a> -<span class="sourceLineNo">5675</span> closeRegionOperation(Operation.SCAN);<a name="line.5675"></a> -<span class="sourceLineNo">5676</span> }<a name="line.5676"></a> -<span class="sourceLineNo">5677</span> }<a name="line.5677"></a> -<span class="sourceLineNo">5678</span><a name="line.5678"></a> -<span class="sourceLineNo">5679</span> @Override<a name="line.5679"></a> -<span class="sourceLineNo">5680</span> public boolean nextRaw(List<Cell> outResults) throws IOException {<a name="line.5680"></a> -<span class="sourceLineNo">5681</span> // Use the RegionScanner's context by default<a name="line.5681"></a> -<span class="sourceLineNo">5682</span> return nextRaw(outResults, defaultScannerContext);<a name="line.5682"></a> -<span class="sourceLineNo">5683</span> }<a name="line.5683"></a> -<span class="sourceLineNo">5684</span><a name="line.5684"></a> -<span class="sourceLineNo">5685</span> @Override<a name="line.5685"></a> -<span class="sourceLineNo">5686</span> public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)<a name="line.5686"></a> -<span class="sourceLineNo">5687</span> throws IOException {<a name="line.5687"></a> -<span class="sourceLineNo">5688</span> if (storeHeap == null) {<a name="line.5688"></a> -<span class="sourceLineNo">5689</span> // scanner is closed<a name="line.5689"></a> -<span class="sourceLineNo">5690</span> throw new UnknownScannerException("Scanner was closed");<a name="line.5690"></a> -<span class="sourceLineNo">5691</span> }<a name="line.5691"></a> -<span class="sourceLineNo">5692</span> boolean moreValues = false;<a name="line.5692"></a> -<span class="sourceLineNo">5693</span> try {<a name="line.5693"></a> -<span class="sourceLineNo">5694</span> if (outResults.isEmpty()) {<a name="line.5694"></a> -<span class="sourceLineNo">5695</span> // Usually outResults is empty. This is true when next is called<a name="line.5695"></a> -<span class="sourceLineNo">5696</span> // to handle scan or get operation.<a name="line.5696"></a> -<span class="sourceLineNo">5697</span> moreValues = nextInternal(outResults, scannerContext);<a name="line.5697"></a> -<span class="sourceLineNo">5698</span> } else {<a name="line.5698"></a> -<span class="sourceLineNo">5699</span> List<Cell> tmpList = new ArrayList<Cell>();<a name="line.5699"></a> -<span class="sourceLineNo">5700</span> moreValues = nextInternal(tmpList, scannerContext);<a name="line.5700"></a> -<span class="sourceLineNo">5701</span> outResults.addAll(tmpList);<a name="line.5701"></a> -<span class="sourceLineNo">5702</span> }<a name="line.5702"></a> -<span class="sourceLineNo">5703</span><a name="line.5703"></a> -<span class="sourceLineNo">5704</span> // If the size limit was reached it means a partial Result is being<a name="line.5704"></a> -<span class="sourceLineNo">5705</span> // returned. Returning a<a name="line.5705"></a> -<span class="sourceLineNo">5706</span> // partial Result means that we should not reset the filters; filters<a name="line.5706"></a> -<span class="sourceLineNo">5707</span> // should only be reset in<a name="line.5707"></a> -<span class="sourceLineNo">5708</span> // between rows<a name="line.5708"></a> -<span class="sourceLineNo">5709</span> if (!scannerContext.partialResultFormed()) resetFilters();<a name="line.5709"></a> -<span class="sourceLineNo">5710</span><a name="line.5710"></a> -<span class="sourceLineNo">5711</span> if (isFilterDoneInternal()) {<a name="line.5711"></a> -<span class="sourceLineNo">5712</span> moreValues = false;<a name="line.5712"></a> -<span class="sourceLineNo">5713</span> }<a name="line.5713"></a> -<span class="sourceLineNo">5714</span><a name="line.5714"></a> -<span class="sourceLineNo">5715</span> // If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise<a name="line.5715"></a> -<span class="sourceLineNo">5716</span> // it is a call coming from the RsRpcServices.scan().<a name="line.5716"></a> -<span class="sourceLineNo">5717</span> if (copyCellsFromSharedMem && !outResults.isEmpty()) {<a name="line.5717"></a> -<span class="sourceLineNo">5718</span> // Do the copy of the results here.<a name="line.5718"></a> -<span class="sourceLineNo">5719</span> ListIterator<Cell> listItr = outResults.listIterator();<a name="line.5719"></a> -<span class="sourceLineNo">5720</span> Cell cell = null;<a name="line.5720"></a> -<span class="sourceLineNo">5721</span> while (listItr.hasNext()) {<a name="line.5721"></a> -<span class="sourceLineNo">5722</span> cell = listItr.next();<a name="line.5722"></a> -<span class="sourceLineNo">5723</span> if (cell instanceof ShareableMemory) {<a name="line.5723"></a> -<span class="sourceLineNo">5724</span> listItr.set(((ShareableMemory) cell).cloneToCell());<a name="line.5724"></a> -<span class="sourceLineNo">5725</span> }<a name="line.5725"></a> -<span class="sourceLineNo">5726</span> }<a name="line.5726"></a> -<span class="sourceLineNo">5727</span> }<a name="line.5727"></a> -<span class="sourceLineNo">5728</span> } finally {<a name="line.5728"></a> -<span class="sourceLineNo">5729</span> if (copyCellsFromSharedMem) {<a name="line.5729"></a> -<span class="sourceLineNo">5730</span> // In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return<a name="line.5730"></a> -<span class="sourceLineNo">5731</span> // the blocks then and there (for wrapped CPs)<a name="line.5731"></a> -<span class="sourceLineNo">5732</span> this.shipped();<a name="line.5732"></a> -<span class="sourceLineNo">5733</span> }<a name="line.5733"></a> -<span class="sourceLineNo">5734</span> }<a name="line.5734"></a> -<span class="sourceLineNo">5735</span> return moreValues;<a name="line.5735"></a> -<span class="sourceLineNo">5736</span> }<a name="line.5736"></a> -<span class="sourceLineNo">5737</span><a name="line.5737"></a> -<span class="sourceLineNo">5738</span> /**<a name="line.5738"></a> -<span class="sourceLineNo">5739</span> * @return true if more cells exist after this batch, false if scanner is done<a name="line.5739"></a> -<span class="sourceLineNo">5740</span> */<a name="line.5740"></a> -<span class="sourceLineNo">5741</span> private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)<a name="line.5741"></a> -<span class="sourceLineNo">5742</span> throws IOException {<a name="line.5742"></a> -<span class="sourceLineNo">5743</span> assert joinedContinuationRow != null;<a name="line.5743"></a> -<span class="sourceLineNo">5744</span> boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,<a name="line.5744"></a> -<span class="sourceLineNo">5745</span> joinedContinuationRow);<a name="line.5745"></a> -<span class="sourceLineNo">5746</span><a name="line.5746"></a> -<span class="sourceLineNo">5747</span> if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5747"></a> -<span class="sourceLineNo">5748</span> // We are done with this row, reset the continuation.<a name="line.5748"></a> -<span class="sourceLineNo">5749</span> joinedContinuationRow = null;<a name="line.5749"></a> -<span class="sourceLineNo">5750</span> }<a name="line.5750"></a> -<span class="sourceLineNo">5751</span> // As the data is obtained from two independent heaps, we need to<a name="line.5751"></a> -<span class="sourceLineNo">5752</span> // ensure that result list is sorted, because Result relies on that.<a name="line.5752"></a> -<span class="sourceLineNo">5753</span> Collections.sort(results, comparator);<a name="line.5753"></a> -<span class="sourceLineNo">5754</span> return moreValues;<a name="line.5754"></a> -<span class="sourceLineNo">5755</span> }<a name="line.5755"></a> -<span class="sourceLineNo">5756</span><a name="line.5756"></a> -<span class="sourceLineNo">5757</span> /**<a name="line.5757"></a> -<span class="sourceLineNo">5758</span> * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is<a name="line.5758"></a> -<span class="sourceLineNo">5759</span> * reached, or remainingResultSize (if not -1) is reaced<a name="line.5759"></a> -<span class="sourceLineNo">5760</span> * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.<a name="line.5760"></a> -<span class="sourceLineNo">5761</span> * @param scannerContext<a name="line.5761"></a> -<span class="sourceLineNo">5762</span> * @param currentRowCell<a name="line.5762"></a> -<span class="sourceLineNo">5763</span> * @return state of last call to {@link KeyValueHeap#next()}<a name="line.5763"></a> -<span class="sourceLineNo">5764</span> */<a name="line.5764"></a> -<span class="sourceLineNo">5765</span> private boolean populateResult(List<Cell> results, KeyValueHeap heap,<a name="line.5765"></a> -<span class="sourceLineNo">5766</span> ScannerContext scannerContext, Cell currentRowCell) throws IOException {<a name="line.5766"></a> -<span class="sourceLineNo">5767</span> Cell nextKv;<a name="line.5767"></a> -<span class="sourceLineNo">5768</span> boolean moreCellsInRow = false;<a name="line.5768"></a> -<span class="sourceLineNo">5769</span> boolean tmpKeepProgress = scannerContext.getKeepProgress();<a name="line.5769"></a> -<span class="sourceLineNo">5770</span> // Scanning between column families and thus the scope is between cells<a name="line.5770"></a> -<span class="sourceLineNo">5771</span> LimitScope limitScope = LimitScope.BETWEEN_CELLS;<a name="line.5771"></a> -<span class="sourceLineNo">5772</span> try {<a name="line.5772"></a> -<span class="sourceLineNo">5773</span> do {<a name="line.5773"></a> -<span class="sourceLineNo">5774</span> // We want to maintain any progress that is made towards the limits while scanning across<a name="line.5774"></a> -<span class="sourceLineNo">5775</span> // different column families. To do this, we toggle the keep progress flag on during calls<a name="line.5775"></a> -<span class="sourceLineNo">5776</span> // to the StoreScanner to ensure that any progress made thus far is not wiped away.<a name="line.5776"></a> -<span class="sourceLineNo">5777</span> scannerContext.setKeepProgress(true);<a name="line.5777"></a> -<span class="sourceLineNo">5778</span> heap.next(results, scannerContext);<a name="line.5778"></a> -<span class="sourceLineNo">5779</span> scannerContext.setKeepProgress(tmpKeepProgress);<a name="line.5779"></a> -<span class="sourceLineNo">5780</span><a name="line.5780"></a> -<span class="sourceLineNo">5781</span> nextKv = heap.peek();<a name="line.5781"></a> -<span class="sourceLineNo">5782</span> moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);<a name="line.5782"></a> -<span class="sourceLineNo">5783</span> if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);<a name="line.5783"></a> -<span class="sourceLineNo">5784</span> if (scannerContext.checkBatchLimit(limitScope)) {<a name="line.5784"></a> -<span class="sourceLineNo">5785</span> return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();<a name="line.5785"></a> -<span class="sourceLineNo">5786</span> } else if (scannerContext.checkSizeLimit(limitScope)) {<a name="line.5786"></a> -<span class="sourceLineNo">5787</span> ScannerContext.NextState state =<a name="line.5787"></a> -<span class="sourceLineNo">5788</span> moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;<a name="line.5788"></a> -<span class="sourceLineNo">5789</span> return scannerContext.setScannerState(state).hasMoreValues();<a name="line.5789"></a> -<span class="sourceLineNo">5790</span> } else if (scannerContext.checkTimeLimit(limitScope)) {<a name="line.5790"></a> -<span class="sourceLineNo">5791</span> ScannerContext.NextState state =<a name="line.5791"></a> -<span class="sourceLineNo">5792</span> moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;<a name="line.5792"></a> -<span class="sourceLineNo">5793</span> return scannerContext.setScannerState(state).hasMoreValues();<a name="line.5793"></a> -<span class="sourceLineNo">5794</span> }<a name="line.5794"></a> -<span class="sourceLineNo">5795</span> } while (moreCellsInRow);<a name="line.5795"></a> -<span class="sourceLineNo">5796</span> } catch (FileNotFoundException e) {<a name="line.5796"></a> -<span class="sourceLineNo">5797</span> throw handleFileNotFound(e);<a name="line.5797"></a> -<span class="sourceLineNo">5798</span> }<a name="line.5798"></a> -<span class="sourceLineNo">5799</span> return nextKv != null;<a name="line.5799"></a> -<span class="sourceLineNo">5800</span> }<a name="line.5800"></a> -<span class="sourceLineNo">5801</span><a name="line.5801"></a> -<span class="sourceLineNo">5802</span> /**<a name="line.5802"></a> -<span class="sourceLineNo">5803</span> * Based on the nextKv in the heap, and the current row, decide whether or not there are more<a name="line.5803"></a> -<span class="sourceLineNo">5804</span> * cells to be read in the heap. If the row of the nextKv in the heap matches the current row<a name="line.5804"></a> -<span class="sourceLineNo">5805</span> * then there are more cells to be read in the row.<a name="line.5805"></a> -<span class="sourceLineNo">5806</span> * @param nextKv<a name="line.5806"></a> -<span class="sourceLineNo">5807</span> * @param currentRowCell<a name="line.5807"></a> -<span class="sourceLineNo">5808</span> * @return true When there are more cells in the row to be read<a name="line.5808"></a> -<span class="sourceLineNo">5809</span> */<a name="line.5809"></a> -<span class="sourceLineNo">5810</span> private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {<a name="line.5810"></a> -<span class="sourceLineNo">5811</span> return nextKv != null && CellUtil.matchingRow(nextKv, currentRowCell);<a name="line.5811"></a> -<span class="sourceLineNo">5812</span> }<a name="line.5812"></a> -<span class="sourceLineNo">5813</span><a name="line.5813"></a> -<span class="sourceLineNo">5814</span> /*<a name="line.5814"></a> -<span class="sourceLineNo">5815</span> * @return True if a filter rules the scanner is over, done.<a name="line.5815"></a> -<span class="sourceLineNo">5816</span> */<a name="line.5816"></a> -<span class="sourceLineNo">5817</span> @Override<a name="line.5817"></a> -<span class="sourceLineNo">5818</span> public synchronized boolean isFilterDone() throws IOException {<a name="line.5818"></a> -<span class="sourceLineNo">5819</span> return isFilterDoneInternal();<a name="line.5819"></a> -<span class="sourceLineNo">5820</span> }<a name="line.5820"></a> -<span class="sourceLineNo">5821</span><a name="line.5821"></a> -<span class="sourceLineNo">5822</span> private boolean isFilterDoneInternal() throws IOException {<a name="line.5822"></a> -<span class="sourceLineNo">5823</span> return this.filter != null && this.filter.filterAllRemaining();<a name="line.5823"></a> -<span class="sourceLineNo">5824</span> }<a name="line.5824"></a> -<span class="sourceLineNo">5825</span><a name="line.5825"></a> -<span class="sourceLineNo">5826</span> private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)<a name="line.5826"></a> -<span class="sourceLineNo">5827</span> throws IOException {<a name="line.5827"></a> -<span class="sourceLineNo">5828</span> if (!results.isEmpty()) {<a name="line.5828"></a> -<span class="sourceLineNo">5829</span> throw new IllegalArgumentException("First parameter should be an empty list");<a name="line.5829"></a> -<span class="sourceLineNo">5830</span> }<a name="line.5830"></a> -<span class="sourceLineNo">5831</span> if (scannerContext == null) {<a name="line.5831"></a> -<span class="sourceLineNo">5832</span> throw new IllegalArgumentException("Scanner context cannot be null");<a name="line.5832"></a> -<span class="sourceLineNo">5833</span> }<a name="line.5833"></a> -<span class="sourceLineNo">5834</span> RpcCallContext rpcCall = RpcServer.getCurrentCall();<a name="line.5834"></a> -<span class="sourceLineNo">5835</span><a name="line.5835"></a> -<span class="sourceLineNo">5836</span> // Save the initial progress from the Scanner context in these local variables. The progress<a name="line.5836"></a> -<span class="sourceLineNo">5837</span> // may need to be reset a few times if rows are being filtered out so we save the initial<a name="line.5837"></a> -<span class="sourceLineNo">5838</span> // progress.<a name="line.5838"></a> -<span class="sourceLineNo">5839</span> int initialBatchProgress = scannerContext.getBatchProgress();<a name="line.5839"></a> -<span class="sourceLineNo">5840</span> long initialSizeProgress = scannerContext.getSizeProgress();<a name="line.5840"></a> -<span class="sourceLineNo">5841</span> long initialTimeProgress = scannerContext.getTimeProgress();<a name="line.5841"></a> -<span class="sourceLineNo">5842</span><a name="line.5842"></a> -<span class="sourceLineNo">5843</span> // The loop here is used only when at some point during the next we determine<a name="line.5843"></a> -<span class="sourceLineNo">5844</span> // that due to effects of filters or otherwise, we have an empty row in the result.<a name="line.5844"></a> -<span class="sourceLineNo">5845</span> // Then we loop and try again. Otherwise, we must get out on the first iteration via return,<a name="line.5845"></a> -<span class="sourceLineNo">5846</span> // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,<a name="line.5846"></a> -<span class="sourceLineNo">5847</span> // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).<a name="line.5847"></a> -<span class="sourceLineNo">5848</span> while (true) {<a name="line.5848"></a> -<span class="sourceLineNo">5849</span> // Starting to scan a new row. Reset the scanner progress according to whether or not<a name="line.5849"></a> -<span class="sourceLineNo">5850</span> // progress should be kept.<a name="line.5850"></a> -<span class="sourceLineNo">5851</span> if (scannerContext.getKeepProgress()) {<a name="line.5851"></a> -<span class="sourceLineNo">5852</span> // Progress should be kept. Reset to initial values seen at start of method invocation.<a name="line.5852"></a> -<span class="sourceLineNo">5853</span> scannerContext.setProgress(initialBatchProgress, initialSizeProgress,<a name="line.5853"></a> -<span class="sourceLineNo">5854</span> initialTimeProgress);<a name="line.5854"></a> -<span class="sourceLineNo">5855</span> } else {<a name="line.5855"></a> -<span class="sourceLineNo">5856</span> scannerContext.clearProgress();<a name="line.5856"></a> -<span class="sourceLineNo">5857</span> }<a name="line.5857"></a> -<span class="sourceLineNo">5858</span><a name="line.5858"></a> -<span class="sourceLineNo">5859</span> if (rpcCall != null) {<a name="line.5859"></a> -<span class="sourceLineNo">5860</span> // If a user specifies a too-restrictive or too-slow scanner, the<a name="line.5860"></a> -<span class="sourceLineNo">5861</span> // client might time out and disconnect while the server side<a name="line.5861"></a> -<span class="sourceLineNo">5862</span> // is still processing the request. We should abort aggressively<a name="line.5862"></a> -<span class="sourceLineNo">5863</span> // in that case.<a name="line.5863"></a> -<span class="sourceLineNo">5864</span> long afterTime = rpcCall.disconnectSince();<a name="line.5864"></a> -<span class="sourceLineNo">5865</span> if (afterTime >= 0) {<a name="line.5865"></a> -<span class="sourceLineNo">5866</span> throw new CallerDisconnectedException(<a name="line.5866"></a> -<span class="sourceLineNo">5867</span> "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +<a name="line.5867"></a> -<span class="sourceLineNo">5868</span> this + " after " + afterTime + " ms, since " +<a name="line.5868"></a> -<span class="sourceLineNo">5869</span> "caller disconnected");<a name="line.5869"></a> -<span class="sourceLineNo">5870</span> }<a name="line.5870"></a> -<span class="sourceLineNo">5871</span> }<a name="line.5871"></a> -<span class="sourceLineNo">5872</span><a name="line.5872"></a> -<span class="sourceLineNo">5873</span> // Let's see what we have in the storeHeap.<a name="line.5873"></a> -<span class="sourceLineNo">5874</span> Cell current = this.storeHeap.peek();<a name="line.5874"></a> -<span class="sourceLineNo">5875</span><a name="line.5875"></a> -<span class="sourceLineNo">5876</span> boolean stopRow = isStopRow(current);<a name="line.5876"></a> -<span class="sourceLineNo">5877</span> // When has filter row is true it means that the all the cells for a particular row must be<a name="line.5877"></a> -<span class="sourceLineNo">5878</span> // read before a filtering decision can be made. This means that filters where hasFilterRow<a name="line.5878"></a> -<span class="sourceLineNo">5879</span> // run the risk of encountering out of memory errors in the case that they are applied to a<a name="line.5879"></a> -<span class="sourceLineNo">5880</span> // table that has very large rows.<a name="line.5880"></a> -<span class="sourceLineNo">5881</span> boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();<a name="line.5881"></a> -<span class="sourceLineNo">5882</span><a name="line.5882"></a> -<span class="sourceLineNo">5883</span> // If filter#hasFilterRow is true, partial results are not allowed since allowing them<a name="line.5883"></a> -<span class="sourceLineNo">5884</span> // would prevent the filters from being evaluated. Thus, if it is true, change the<a name="line.5884"></a> -<span class="sourceLineNo">5885</span> // scope of any limits that could potentially create partial results to<a name="line.5885"></a> -<span class="sourceLineNo">5886</span> // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row<a name="line.5886"></a> -<span class="sourceLineNo">5887</span> if (hasFilterRow) {<a name="line.5887"></a> -<span class="sourceLineNo">5888</span> if (LOG.isTraceEnabled()) {<a name="line.5888"></a> -<span class="sourceLineNo">5889</span> LOG.trace("filter#hasFilterRow is true which prevents partial results from being "<a name="line.5889"></a> -<span class="sourceLineNo">5890</span> + " formed. Changing scope of limits that may create partials");<a name="line.5890"></a> -<span class="sourceLineNo">5891</span> }<a name="line.5891"></a> -<span class="sourceLineNo">5892</span> scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);<a name="line.5892"></a> -<span class="sourceLineNo">5893</span> scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);<a name="line.5893"></a> -<span class="sourceLineNo">5894</span> }<a name="line.5894"></a> -<span class="sourceLineNo">5895</span><a name="line.5895"></a> -<span class="sourceLineNo">5896</span> // Check if we were getting data from the joinedHeap and hit the limit.<a name="line.5896"></a> -<span class="sourceLineNo">5897</span> // If not, then it's main path - getting results from storeHeap.<a name="line.5897"></a> -<span class="sourceLineNo">5898</span> if (joinedContinuationRow == null) {<a name="line.5898"></a> -<span class="sourceLineNo">5899</span> // First, check if we are at a stop row. If so, there are no more results.<a name="line.5899"></a> -<span class="sourceLineNo">5900</span> if (stopRow) {<a name="line.5900"></a> -<span class="sourceLineNo">5901</span> if (hasFilterRow) {<a name="line.5901"></a> -<span class="sourceLineNo">5902</span> filter.filterRowCells(results);<a name="line.5902"></a> -<span class="sourceLineNo">5903</span> }<a name="line.5903"></a> -<span class="sourceLineNo">5904</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.5904"></a> -<span class="sourceLineNo">5905</span> }<a name="line.5905"></a> -<span class="sourceLineNo">5906</span><a name="line.5906"></a> -<span class="sourceLineNo">5907</span> // Check if rowkey filter wants to exclude this row. If so, loop to next.<a name="line.5907"></a> -<span class="sourceLineNo">5908</span> // Technically, if we hit limits before on this row, we don't need this call.<a name="line.5908"></a> -<span class="sourceLineNo">5909</span> if (filterRowKey(current)) {<a name="line.5909"></a> -<span class="sourceLineNo">5910</span> incrementCountOfRowsFilteredMetric(scannerContext);<a name="line.5910"></a> -<span class="sourceLineNo">5911</span> // Typically the count of rows scanned is incremented inside #populateResult. However,<a name="line.5911"></a> -<span class="sourceLineNo">5912</span> // here we are filtering a row based purely on its row key, preventing us from calling<a name="line.5912"></a> -<span class="sourceLineNo">5913</span> // #populateResult. Thus, perform the necessary increment here to rows scanned metric<a name="line.5913"></a> -<span class="sourceLineNo">5914</span> incrementCountOfRowsScannedMetric(scannerContext);<a name="line.5914"></a> -<span class="sourceLineNo">5915</span> boolean moreRows = nextRow(scannerContext, current);<a name="line.5915"></a> -<span class="sourceLineNo">5916</span> if (!moreRows) {<a name="line.5916"></a> -<span class="sourceLineNo">5917</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.5917"></a> -<span class="sourceLineNo">5918</span> }<a name="line.5918"></a> -<span class="sourceLineNo">5919</span> results.clear();<a name="line.5919"></a> -<span class="sourceLineNo">5920</span> continue;<a name="line.5920"></a> -<span class="sourceLineNo">5921</span> }<a name="line.5921"></a> -<span class="sourceLineNo">5922</span><a name="line.5922"></a> -<span class="sourceLineNo">5923</span> // Ok, we are good, let's try to get some results from the main heap.<a name="line.5923"></a> -<span class="sourceLineNo">5924</span> populateResult(results, this.storeHeap, scannerContext, current);<a name="line.5924"></a> -<span class="sourceLineNo">5925</span><a name="line.5925"></a> -<span class="sourceLineNo">5926</span> if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5926"></a> -<span class="sourceLineNo">5927</span> if (hasFilterRow) {<a name="line.5927"></a> -<span class="sourceLineNo">5928</span> throw new IncompatibleFilterException(<a name="line.5928"></a> -<span class="sourceLineNo">5929</span> "Filter whose hasFilterRow() returns true is incompatible with scans that must "<a name="line.5929"></a> -<span class="sourceLineNo">5930</span> + " stop mid-row because of a limit. ScannerContext:" + scannerContext);<a name="line.5930"></a> -<span class="sourceLineNo">5931</span> }<a name="line.5931"></a> -<span class="sourceLineNo">5932</span> return true;<a name="line.5932"></a> -<span class="sourceLineNo">5933</span> }<a name="line.5933"></a> -<span class="sourceLineNo">5934</span><a name="line.5934"></a> -<span class="sourceLineNo">5935</span> Cell nextKv = this.storeHeap.peek();<a name="line.5935"></a> -<span class="sourceLineNo">5936</span> stopRow = nextKv == null || isStopRow(nextKv);<a name="line.5936"></a> -<span class="sourceLineNo">5937</span> // save that the row was empty before filters applied to it.<a name="line.5937"></a> -<span class="sourceLineNo">5938</span> final boolean isEmptyRow = results.isEmpty();<a name="line.5938"></a> -<span class="sourceLineNo">5939</span><a name="line.5939"></a> -<span class="sourceLineNo">5940</span> // We have the part of the row necessary for filtering (all of it, usually).<a name="line.5940"></a> -<span class="sourceLineNo">5941</span> // First filter with the filterRow(List).<a name="line.5941"></a> -<span class="sourceLineNo">5942</span> FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;<a name="line.5942"></a> -<span class="sourceLineNo">5943</span> if (hasFilterRow) {<a name="line.5943"></a> -<span class="sourceLineNo">5944</span> ret = filter.filterRowCellsWithRet(results);<a name="line.5944"></a> -<span class="sourceLineNo">5945</span><a name="line.5945"></a> -<span class="sourceLineNo">5946</span> // We don't know how the results have changed after being filtered. Must set progress<a name="line.5946"></a> -<span class="sourceLineNo">5947</span> // according to contents of results now. However, a change in the results should not<a name="line.5947"></a> -<span class="sourceLineNo">5948</span> // affect the time progress. Thus preserve whatever time progress has been made<a name="line.5948"></a> -<span class="sourceLineNo">5949</span> long timeProgress = scannerContext.getTimeProgress();<a name="line.5949"></a> -<span class="sourceLineNo">5950</span> if (scannerContext.getKeepProgress()) {<a name="line.5950"></a> -<span class="sourceLineNo">5951</span> scannerContext.setProgress(initialBatchProgress, initialSizeProgress,<a name="line.5951"></a> -<span class="sourceLineNo">5952</span> initialTimeProgress);<a name="line.5952"></a> -<span class="sourceLineNo">5953</span> } else {<a name="line.5953"></a> -<span class="sourceLineNo">5954</span> scannerContext.clearProgress();<a name="line.5954"></a> -<span class="sourceLineNo">5955</span> }<a name="line.5955"></a> -<span class="sourceLineNo">5956</span> scannerContext.setTimeProgress(timeProgress);<a name="line.5956"></a> -<span class="sourceLineNo">5957</span> scannerContext.incrementBatchProgress(results.size());<a name="line.5957"></a> -<span class="sourceLineNo">5958</span> for (Cell cell : results) {<a name="line.5958"></a> -<span class="sourceLineNo">5959</span> scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));<a name="line.5959"></a> -<span class="sourceLineNo">5960</span> }<a name="line.5960"></a> -<span class="sourceLineNo">5961</span> }<a name="line.5961"></a> -<span class="sourceLineNo">5962</span><a name="line.5962"></a> -<span class="sourceLineNo">5963</span> if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {<a name="line.5963"></a> -<span class="sourceLineNo">5964</span> incrementCountOfRowsFilteredMetric(scannerContext);<a name="line.5964"></a> -<span class="sourceLineNo">5965</span> results.clear();<a name="line.5965"></a> -<span class="sourceLineNo">5966</span> boolean moreRows = nextRow(scannerContext, current);<a name="line.5966"></a> -<span class="sourceLineNo">5967</span> if (!moreRows) {<a name="line.5967"></a> -<span class="sourceLineNo">5968</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.5968"></a> -<span class="sourceLineNo">5969</span> }<a name="line.5969"></a> -<span class="sourceLineNo">5970</span><a name="line.5970"></a> -<span class="sourceLineNo">5971</span> // This row was totally filtered out, if this is NOT the last row,<a name="line.5971"></a> -<span class="sourceLineNo">5972</span> // we should continue on. Otherwise, nothing else to do.<a name="line.5972"></a> -<span class="sourceLineNo">5973</span> if (!stopRow) continue;<a name="line.5973"></a> -<span class="sourceLineNo">5974</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.5974"></a> -<span class="sourceLineNo">5975</span> }<a name="line.5975"></a> -<span class="sourceLineNo">5976</span><a name="line.5976"></a> -<span class="sourceLineNo">5977</span> // Ok, we are done with storeHeap for this row.<a name="line.5977"></a> -<span class="sourceLineNo">5978</span> // Now we may need to fetch additional, non-essential data into row.<a name="line.5978"></a> -<span class="sourceLineNo">5979</span> // These values are not needed for filter to work, so we postpone their<a name="line.5979"></a> -<span class="sourceLineNo">5980</span> // fetch to (possibly) reduce amount of data loads from disk.<a name="line.5980"></a> -<span class="sourceLineNo">5981</span> if (this.joinedHeap != null) {<a name="line.5981"></a> -<span class="sourceLineNo">5982</span> boolean mayHaveData = joinedHeapMayHaveData(current);<a name="line.5982"></a> -<span class="sourceLineNo">5983</span> if (mayHaveData) {<a name="line.5983"></a> -<span class="sourceLineNo">5984</span> joinedContinuationRow = current;<a name="line.5984"></a> -<span class="sourceLineNo">5985</span> populateFromJoinedHeap(results, scannerContext);<a name="line.5985"></a> -<span class="sourceLineNo">5986</span><a name="line.5986"></a> -<span class="sourceLineNo">5987</span> if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5987"></a> -<span class="sourceLineNo">5988</span> return true;<a name="line.5988"></a> -<span class="sourceLineNo">5989</span> }<a name="line.5989"></a> -<span class="sourceLineNo">5990</span> }<a name="line.5990"></a> -<span class="sourceLineNo">5991</span> }<a name="line.5991"></a> -<span class="sourceLineNo">5992</span> } else {<a name="line.5992"></a> -<span class="sourceLineNo">5993</span> // Populating from the joined heap was stopped by limits, populate some more.<a name="line.5993"></a> -<span class="sourceLineNo">5994</span> populateFromJoinedHeap(results, scannerContext);<a name="line.5994"></a> -<span class="sourceLineNo">5995</span> if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5995"></a> -<span class="sourceLineNo">5996</span> return true;<a name="line.5996"></a> -<span class="sourceLineNo">5997</span> }<a name="line.5997"></a> -<span class="sourceLineNo">5998</span> }<a name="line.5998"></a> -<span class="sourceLineNo">5999</span> // We may have just called populateFromJoinedMap and hit the limits. If that is<a name="line.5999"></a> -<span class="sourceLineNo">6000</span> // the case, we need to call it again on the next next() invocation.<a name="line.6000"></a> -<span class="sourceLineNo">6001</span> if (joinedContinuationRow != null) {<a name="line.6001"></a> -<span class="sourceLineNo">6002</span> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();<a name="line.6002"></a> -<span class="sourceLineNo">6003</span> }<a name="line.6003"></a> -<span class="sourceLineNo">6004</span><a name="line.6004"></a> -<span class="sourceLineNo">6005</span> // Finally, we are done with both joinedHeap and storeHeap.<a name="line.6005"></a> -<span class="sourceLineNo">6006</span> // Double check to prevent empty rows from appearing in result. It could be<a name="line.6006"></a> -<span class="sourceLineNo">6007</span> // the case when SingleColumnValueExcludeFilter is used.<a name="line.6007"></a> -<span class="sourceLineNo">6008</span> if (results.isEmpty()) {<a name="line.6008"></a> -<span class="sourceLineNo">6009</span> incrementCountOfRowsFilteredMetric(scannerContext);<a name="line.6009"></a> -<span class="sourceLineNo">6010</span> boolean moreRows = nextRow(scannerContext, current);<a name="line.6010"></a> -<span class="sourceLineNo">6011</span> if (!moreRows) {<a name="line.6011"></a> -<span class="sourceLineNo">6012</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6012"></a> -<span class="sourceLineNo">6013</span> }<a name="line.6013"></a> -<span class="sourceLineNo">6014</span> if (!stopRow) continue;<a name="line.6014"></a> -<span class="sourceLineNo">6015</span> }<a name="line.6015"></a> -<span class="sourceLineNo">6016</span><a name="line.6016"></a> -<span class="sourceLineNo">6017</span> if (stopRow) {<a name="line.6017"></a> -<span class="sourceLineNo">6018</span> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();<a name="line.6018"></a> -<span class="sourceLineNo">6019</span> } else {<a name="line.6019"></a> -<span class="sourceLineNo">6020</span> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();<a name="line.6020"></a> -<span class="sourceLineNo">6021</span> }<a name="line.6021"></a> -<span class="sourceLineNo">6022</span> }<a name="line.6022"></a> -<span class="sourceLineNo">6023</span> }<a name="line.6023"></a> -<span class="sourceLineNo">6024</span><a name="line.6024"></a> -<span class="sourceLineNo">6025</span> protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {<a name="line.6025"></a> -<span class="sourceLineNo">6026</span> if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;<a name="line.6026"></a> -<span class="sourceLineNo">6027</span><a name="line.6027"></a> -<span class="sourceLineNo">6028</span> scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();<a name="line.6028"></a> -<span class="sourceLineNo">6029</span> }<a name="line.6029"></a> -<span class="sourceLineNo">6030</span><a name="line.6030"></a> -<span class="sourceLineNo">6031</span> protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {<a name="line.6031"></a> -<span class="sourceLineNo">6032</span> if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;<a name="line.6032"></a> -<span class="sourceLineNo">6033</span><a name="line.6033"></a> -<span class="sourceLineNo">6034</span> scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();<a name="line.6034"></a> -<span class="sourceLineNo">6035</span> }<a name="line.6035"></a> -<span class="sourceLineNo">6036</span><a name="lin
<TRUNCATED>