http://git-wip-us.apache.org/repos/asf/hbase-site/blob/e11cf2cb/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrContext.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrContext.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrContext.html index fdc5a8a..62e604e 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrContext.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrContext.html @@ -320,141 +320,143 @@ <span class="sourceLineNo">312</span> this.cellDecoder = codec.getDecoder(this.inputStream);<a name="line.312"></a> <span class="sourceLineNo">313</span> if (this.hasCompression) {<a name="line.313"></a> <span class="sourceLineNo">314</span> this.byteStringUncompressor = codec.getByteStringUncompressor();<a name="line.314"></a> -<span class="sourceLineNo">315</span> }<a name="line.315"></a> -<span class="sourceLineNo">316</span> }<a name="line.316"></a> -<span class="sourceLineNo">317</span><a name="line.317"></a> -<span class="sourceLineNo">318</span> @Override<a name="line.318"></a> -<span class="sourceLineNo">319</span> protected boolean hasCompression() {<a name="line.319"></a> -<span class="sourceLineNo">320</span> return this.hasCompression;<a name="line.320"></a> -<span class="sourceLineNo">321</span> }<a name="line.321"></a> -<span class="sourceLineNo">322</span><a name="line.322"></a> -<span class="sourceLineNo">323</span> @Override<a name="line.323"></a> -<span class="sourceLineNo">324</span> protected boolean hasTagCompression() {<a name="line.324"></a> -<span class="sourceLineNo">325</span> return this.hasTagCompression;<a name="line.325"></a> -<span class="sourceLineNo">326</span> }<a name="line.326"></a> -<span class="sourceLineNo">327</span><a name="line.327"></a> -<span class="sourceLineNo">328</span> @Override<a name="line.328"></a> -<span class="sourceLineNo">329</span> protected boolean readNext(Entry entry) throws IOException {<a name="line.329"></a> -<span class="sourceLineNo">330</span> while (true) {<a name="line.330"></a> -<span class="sourceLineNo">331</span> // OriginalPosition might be < 0 on local fs; if so, it is useless to us.<a name="line.331"></a> -<span class="sourceLineNo">332</span> long originalPosition = this.inputStream.getPos();<a name="line.332"></a> -<span class="sourceLineNo">333</span> if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {<a name="line.333"></a> -<span class="sourceLineNo">334</span> if (LOG.isTraceEnabled()) {<a name="line.334"></a> -<span class="sourceLineNo">335</span> LOG.trace("Reached end of expected edits area at offset " + originalPosition);<a name="line.335"></a> -<span class="sourceLineNo">336</span> }<a name="line.336"></a> -<span class="sourceLineNo">337</span> return false;<a name="line.337"></a> -<span class="sourceLineNo">338</span> }<a name="line.338"></a> -<span class="sourceLineNo">339</span> WALKey.Builder builder = WALKey.newBuilder();<a name="line.339"></a> -<span class="sourceLineNo">340</span> long size = 0;<a name="line.340"></a> -<span class="sourceLineNo">341</span> try {<a name="line.341"></a> -<span class="sourceLineNo">342</span> long available = -1;<a name="line.342"></a> -<span class="sourceLineNo">343</span> try {<a name="line.343"></a> -<span class="sourceLineNo">344</span> int firstByte = this.inputStream.read();<a name="line.344"></a> -<span class="sourceLineNo">345</span> if (firstByte == -1) {<a name="line.345"></a> -<span class="sourceLineNo">346</span> throw new EOFException("First byte is negative at offset " + originalPosition);<a name="line.346"></a> -<span class="sourceLineNo">347</span> }<a name="line.347"></a> -<span class="sourceLineNo">348</span> size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);<a name="line.348"></a> -<span class="sourceLineNo">349</span> // available may be < 0 on local fs for instance. If so, can't depend on it.<a name="line.349"></a> -<span class="sourceLineNo">350</span> available = this.inputStream.available();<a name="line.350"></a> -<span class="sourceLineNo">351</span> if (available > 0 && available < size) {<a name="line.351"></a> -<span class="sourceLineNo">352</span> throw new EOFException("Available stream not enough for edit, " +<a name="line.352"></a> -<span class="sourceLineNo">353</span> "inputStream.available()= " + this.inputStream.available() + ", " +<a name="line.353"></a> -<span class="sourceLineNo">354</span> "entry size= " + size + " at offset = " + this.inputStream.getPos());<a name="line.354"></a> -<span class="sourceLineNo">355</span> }<a name="line.355"></a> -<span class="sourceLineNo">356</span> ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),<a name="line.356"></a> -<span class="sourceLineNo">357</span> (int)size);<a name="line.357"></a> -<span class="sourceLineNo">358</span> } catch (InvalidProtocolBufferException ipbe) {<a name="line.358"></a> -<span class="sourceLineNo">359</span> throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +<a name="line.359"></a> -<span class="sourceLineNo">360</span> originalPosition + ", currentPosition=" + this.inputStream.getPos() +<a name="line.360"></a> -<span class="sourceLineNo">361</span> ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);<a name="line.361"></a> -<span class="sourceLineNo">362</span> }<a name="line.362"></a> -<span class="sourceLineNo">363</span> if (!builder.isInitialized()) {<a name="line.363"></a> -<span class="sourceLineNo">364</span> // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.<a name="line.364"></a> -<span class="sourceLineNo">365</span> // If we can get the KV count, we could, theoretically, try to get next record.<a name="line.365"></a> -<span class="sourceLineNo">366</span> throw new EOFException("Partial PB while reading WAL, " +<a name="line.366"></a> -<span class="sourceLineNo">367</span> "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());<a name="line.367"></a> -<span class="sourceLineNo">368</span> }<a name="line.368"></a> -<span class="sourceLineNo">369</span> WALKey walKey = builder.build();<a name="line.369"></a> -<span class="sourceLineNo">370</span> entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);<a name="line.370"></a> -<span class="sourceLineNo">371</span> if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {<a name="line.371"></a> -<span class="sourceLineNo">372</span> if (LOG.isTraceEnabled()) {<a name="line.372"></a> -<span class="sourceLineNo">373</span> LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +<a name="line.373"></a> -<span class="sourceLineNo">374</span> this.inputStream.getPos());<a name="line.374"></a> -<span class="sourceLineNo">375</span> }<a name="line.375"></a> -<span class="sourceLineNo">376</span> continue;<a name="line.376"></a> -<span class="sourceLineNo">377</span> }<a name="line.377"></a> -<span class="sourceLineNo">378</span> int expectedCells = walKey.getFollowingKvCount();<a name="line.378"></a> -<span class="sourceLineNo">379</span> long posBefore = this.inputStream.getPos();<a name="line.379"></a> -<span class="sourceLineNo">380</span> try {<a name="line.380"></a> -<span class="sourceLineNo">381</span> int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);<a name="line.381"></a> -<span class="sourceLineNo">382</span> if (expectedCells != actualCells) {<a name="line.382"></a> -<span class="sourceLineNo">383</span> throw new EOFException("Only read " + actualCells); // other info added in catch<a name="line.383"></a> -<span class="sourceLineNo">384</span> }<a name="line.384"></a> -<span class="sourceLineNo">385</span> } catch (Exception ex) {<a name="line.385"></a> -<span class="sourceLineNo">386</span> String posAfterStr = "<unknown>";<a name="line.386"></a> -<span class="sourceLineNo">387</span> try {<a name="line.387"></a> -<span class="sourceLineNo">388</span> posAfterStr = this.inputStream.getPos() + "";<a name="line.388"></a> -<span class="sourceLineNo">389</span> } catch (Throwable t) {<a name="line.389"></a> -<span class="sourceLineNo">390</span> if (LOG.isTraceEnabled()) {<a name="line.390"></a> -<span class="sourceLineNo">391</span> LOG.trace("Error getting pos for error message - ignoring", t);<a name="line.391"></a> -<span class="sourceLineNo">392</span> }<a name="line.392"></a> -<span class="sourceLineNo">393</span> }<a name="line.393"></a> -<span class="sourceLineNo">394</span> String message = " while reading " + expectedCells + " WAL KVs; started reading at "<a name="line.394"></a> -<span class="sourceLineNo">395</span> + posBefore + " and read up to " + posAfterStr;<a name="line.395"></a> -<span class="sourceLineNo">396</span> IOException realEofEx = extractHiddenEof(ex);<a name="line.396"></a> -<span class="sourceLineNo">397</span> throw (EOFException) new EOFException("EOF " + message).<a name="line.397"></a> -<span class="sourceLineNo">398</span> initCause(realEofEx != null ? realEofEx : ex);<a name="line.398"></a> -<span class="sourceLineNo">399</span> }<a name="line.399"></a> -<span class="sourceLineNo">400</span> if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {<a name="line.400"></a> -<span class="sourceLineNo">401</span> LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path<a name="line.401"></a> -<span class="sourceLineNo">402</span> + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "<a name="line.402"></a> -<span class="sourceLineNo">403</span> + this.walEditsStopOffset);<a name="line.403"></a> -<span class="sourceLineNo">404</span> throw new EOFException("Read WALTrailer while reading WALEdits");<a name="line.404"></a> -<span class="sourceLineNo">405</span> }<a name="line.405"></a> -<span class="sourceLineNo">406</span> } catch (EOFException eof) {<a name="line.406"></a> -<span class="sourceLineNo">407</span> // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)<a name="line.407"></a> -<span class="sourceLineNo">408</span> if (originalPosition < 0) {<a name="line.408"></a> -<span class="sourceLineNo">409</span> if (LOG.isTraceEnabled()) {<a name="line.409"></a> -<span class="sourceLineNo">410</span> LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);<a name="line.410"></a> -<span class="sourceLineNo">411</span> }<a name="line.411"></a> -<span class="sourceLineNo">412</span> throw eof;<a name="line.412"></a> -<span class="sourceLineNo">413</span> }<a name="line.413"></a> -<span class="sourceLineNo">414</span> // Else restore our position to original location in hope that next time through we will<a name="line.414"></a> -<span class="sourceLineNo">415</span> // read successfully.<a name="line.415"></a> -<span class="sourceLineNo">416</span> if (LOG.isTraceEnabled()) {<a name="line.416"></a> -<span class="sourceLineNo">417</span> LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);<a name="line.417"></a> -<span class="sourceLineNo">418</span> }<a name="line.418"></a> -<span class="sourceLineNo">419</span> seekOnFs(originalPosition);<a name="line.419"></a> -<span class="sourceLineNo">420</span> return false;<a name="line.420"></a> -<span class="sourceLineNo">421</span> }<a name="line.421"></a> -<span class="sourceLineNo">422</span> return true;<a name="line.422"></a> -<span class="sourceLineNo">423</span> }<a name="line.423"></a> -<span class="sourceLineNo">424</span> }<a name="line.424"></a> -<span class="sourceLineNo">425</span><a name="line.425"></a> -<span class="sourceLineNo">426</span> private IOException extractHiddenEof(Exception ex) {<a name="line.426"></a> -<span class="sourceLineNo">427</span> // There are two problems we are dealing with here. Hadoop stream throws generic exception<a name="line.427"></a> -<span class="sourceLineNo">428</span> // for EOF, not EOFException; and scanner further hides it inside RuntimeException.<a name="line.428"></a> -<span class="sourceLineNo">429</span> IOException ioEx = null;<a name="line.429"></a> -<span class="sourceLineNo">430</span> if (ex instanceof EOFException) {<a name="line.430"></a> -<span class="sourceLineNo">431</span> return (EOFException)ex;<a name="line.431"></a> -<span class="sourceLineNo">432</span> } else if (ex instanceof IOException) {<a name="line.432"></a> -<span class="sourceLineNo">433</span> ioEx = (IOException)ex;<a name="line.433"></a> -<span class="sourceLineNo">434</span> } else if (ex instanceof RuntimeException<a name="line.434"></a> -<span class="sourceLineNo">435</span> && ex.getCause() != null && ex.getCause() instanceof IOException) {<a name="line.435"></a> -<span class="sourceLineNo">436</span> ioEx = (IOException)ex.getCause();<a name="line.436"></a> -<span class="sourceLineNo">437</span> }<a name="line.437"></a> -<span class="sourceLineNo">438</span> if (ioEx != null) {<a name="line.438"></a> -<span class="sourceLineNo">439</span> if (ioEx.getMessage().contains("EOF")) return ioEx;<a name="line.439"></a> -<span class="sourceLineNo">440</span> return null;<a name="line.440"></a> -<span class="sourceLineNo">441</span> }<a name="line.441"></a> -<span class="sourceLineNo">442</span> return null;<a name="line.442"></a> -<span class="sourceLineNo">443</span> }<a name="line.443"></a> -<span class="sourceLineNo">444</span><a name="line.444"></a> -<span class="sourceLineNo">445</span> @Override<a name="line.445"></a> -<span class="sourceLineNo">446</span> protected void seekOnFs(long pos) throws IOException {<a name="line.446"></a> -<span class="sourceLineNo">447</span> this.inputStream.seek(pos);<a name="line.447"></a> -<span class="sourceLineNo">448</span> }<a name="line.448"></a> -<span class="sourceLineNo">449</span>}<a name="line.449"></a> +<span class="sourceLineNo">315</span> } else {<a name="line.315"></a> +<span class="sourceLineNo">316</span> this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();<a name="line.316"></a> +<span class="sourceLineNo">317</span> }<a name="line.317"></a> +<span class="sourceLineNo">318</span> }<a name="line.318"></a> +<span class="sourceLineNo">319</span><a name="line.319"></a> +<span class="sourceLineNo">320</span> @Override<a name="line.320"></a> +<span class="sourceLineNo">321</span> protected boolean hasCompression() {<a name="line.321"></a> +<span class="sourceLineNo">322</span> return this.hasCompression;<a name="line.322"></a> +<span class="sourceLineNo">323</span> }<a name="line.323"></a> +<span class="sourceLineNo">324</span><a name="line.324"></a> +<span class="sourceLineNo">325</span> @Override<a name="line.325"></a> +<span class="sourceLineNo">326</span> protected boolean hasTagCompression() {<a name="line.326"></a> +<span class="sourceLineNo">327</span> return this.hasTagCompression;<a name="line.327"></a> +<span class="sourceLineNo">328</span> }<a name="line.328"></a> +<span class="sourceLineNo">329</span><a name="line.329"></a> +<span class="sourceLineNo">330</span> @Override<a name="line.330"></a> +<span class="sourceLineNo">331</span> protected boolean readNext(Entry entry) throws IOException {<a name="line.331"></a> +<span class="sourceLineNo">332</span> while (true) {<a name="line.332"></a> +<span class="sourceLineNo">333</span> // OriginalPosition might be < 0 on local fs; if so, it is useless to us.<a name="line.333"></a> +<span class="sourceLineNo">334</span> long originalPosition = this.inputStream.getPos();<a name="line.334"></a> +<span class="sourceLineNo">335</span> if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {<a name="line.335"></a> +<span class="sourceLineNo">336</span> if (LOG.isTraceEnabled()) {<a name="line.336"></a> +<span class="sourceLineNo">337</span> LOG.trace("Reached end of expected edits area at offset " + originalPosition);<a name="line.337"></a> +<span class="sourceLineNo">338</span> }<a name="line.338"></a> +<span class="sourceLineNo">339</span> return false;<a name="line.339"></a> +<span class="sourceLineNo">340</span> }<a name="line.340"></a> +<span class="sourceLineNo">341</span> WALKey.Builder builder = WALKey.newBuilder();<a name="line.341"></a> +<span class="sourceLineNo">342</span> long size = 0;<a name="line.342"></a> +<span class="sourceLineNo">343</span> try {<a name="line.343"></a> +<span class="sourceLineNo">344</span> long available = -1;<a name="line.344"></a> +<span class="sourceLineNo">345</span> try {<a name="line.345"></a> +<span class="sourceLineNo">346</span> int firstByte = this.inputStream.read();<a name="line.346"></a> +<span class="sourceLineNo">347</span> if (firstByte == -1) {<a name="line.347"></a> +<span class="sourceLineNo">348</span> throw new EOFException("First byte is negative at offset " + originalPosition);<a name="line.348"></a> +<span class="sourceLineNo">349</span> }<a name="line.349"></a> +<span class="sourceLineNo">350</span> size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);<a name="line.350"></a> +<span class="sourceLineNo">351</span> // available may be < 0 on local fs for instance. If so, can't depend on it.<a name="line.351"></a> +<span class="sourceLineNo">352</span> available = this.inputStream.available();<a name="line.352"></a> +<span class="sourceLineNo">353</span> if (available > 0 && available < size) {<a name="line.353"></a> +<span class="sourceLineNo">354</span> throw new EOFException("Available stream not enough for edit, " +<a name="line.354"></a> +<span class="sourceLineNo">355</span> "inputStream.available()= " + this.inputStream.available() + ", " +<a name="line.355"></a> +<span class="sourceLineNo">356</span> "entry size= " + size + " at offset = " + this.inputStream.getPos());<a name="line.356"></a> +<span class="sourceLineNo">357</span> }<a name="line.357"></a> +<span class="sourceLineNo">358</span> ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),<a name="line.358"></a> +<span class="sourceLineNo">359</span> (int)size);<a name="line.359"></a> +<span class="sourceLineNo">360</span> } catch (InvalidProtocolBufferException ipbe) {<a name="line.360"></a> +<span class="sourceLineNo">361</span> throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +<a name="line.361"></a> +<span class="sourceLineNo">362</span> originalPosition + ", currentPosition=" + this.inputStream.getPos() +<a name="line.362"></a> +<span class="sourceLineNo">363</span> ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);<a name="line.363"></a> +<span class="sourceLineNo">364</span> }<a name="line.364"></a> +<span class="sourceLineNo">365</span> if (!builder.isInitialized()) {<a name="line.365"></a> +<span class="sourceLineNo">366</span> // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.<a name="line.366"></a> +<span class="sourceLineNo">367</span> // If we can get the KV count, we could, theoretically, try to get next record.<a name="line.367"></a> +<span class="sourceLineNo">368</span> throw new EOFException("Partial PB while reading WAL, " +<a name="line.368"></a> +<span class="sourceLineNo">369</span> "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());<a name="line.369"></a> +<span class="sourceLineNo">370</span> }<a name="line.370"></a> +<span class="sourceLineNo">371</span> WALKey walKey = builder.build();<a name="line.371"></a> +<span class="sourceLineNo">372</span> entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);<a name="line.372"></a> +<span class="sourceLineNo">373</span> if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {<a name="line.373"></a> +<span class="sourceLineNo">374</span> if (LOG.isTraceEnabled()) {<a name="line.374"></a> +<span class="sourceLineNo">375</span> LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +<a name="line.375"></a> +<span class="sourceLineNo">376</span> this.inputStream.getPos());<a name="line.376"></a> +<span class="sourceLineNo">377</span> }<a name="line.377"></a> +<span class="sourceLineNo">378</span> continue;<a name="line.378"></a> +<span class="sourceLineNo">379</span> }<a name="line.379"></a> +<span class="sourceLineNo">380</span> int expectedCells = walKey.getFollowingKvCount();<a name="line.380"></a> +<span class="sourceLineNo">381</span> long posBefore = this.inputStream.getPos();<a name="line.381"></a> +<span class="sourceLineNo">382</span> try {<a name="line.382"></a> +<span class="sourceLineNo">383</span> int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);<a name="line.383"></a> +<span class="sourceLineNo">384</span> if (expectedCells != actualCells) {<a name="line.384"></a> +<span class="sourceLineNo">385</span> throw new EOFException("Only read " + actualCells); // other info added in catch<a name="line.385"></a> +<span class="sourceLineNo">386</span> }<a name="line.386"></a> +<span class="sourceLineNo">387</span> } catch (Exception ex) {<a name="line.387"></a> +<span class="sourceLineNo">388</span> String posAfterStr = "<unknown>";<a name="line.388"></a> +<span class="sourceLineNo">389</span> try {<a name="line.389"></a> +<span class="sourceLineNo">390</span> posAfterStr = this.inputStream.getPos() + "";<a name="line.390"></a> +<span class="sourceLineNo">391</span> } catch (Throwable t) {<a name="line.391"></a> +<span class="sourceLineNo">392</span> if (LOG.isTraceEnabled()) {<a name="line.392"></a> +<span class="sourceLineNo">393</span> LOG.trace("Error getting pos for error message - ignoring", t);<a name="line.393"></a> +<span class="sourceLineNo">394</span> }<a name="line.394"></a> +<span class="sourceLineNo">395</span> }<a name="line.395"></a> +<span class="sourceLineNo">396</span> String message = " while reading " + expectedCells + " WAL KVs; started reading at "<a name="line.396"></a> +<span class="sourceLineNo">397</span> + posBefore + " and read up to " + posAfterStr;<a name="line.397"></a> +<span class="sourceLineNo">398</span> IOException realEofEx = extractHiddenEof(ex);<a name="line.398"></a> +<span class="sourceLineNo">399</span> throw (EOFException) new EOFException("EOF " + message).<a name="line.399"></a> +<span class="sourceLineNo">400</span> initCause(realEofEx != null ? realEofEx : ex);<a name="line.400"></a> +<span class="sourceLineNo">401</span> }<a name="line.401"></a> +<span class="sourceLineNo">402</span> if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {<a name="line.402"></a> +<span class="sourceLineNo">403</span> LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path<a name="line.403"></a> +<span class="sourceLineNo">404</span> + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "<a name="line.404"></a> +<span class="sourceLineNo">405</span> + this.walEditsStopOffset);<a name="line.405"></a> +<span class="sourceLineNo">406</span> throw new EOFException("Read WALTrailer while reading WALEdits");<a name="line.406"></a> +<span class="sourceLineNo">407</span> }<a name="line.407"></a> +<span class="sourceLineNo">408</span> } catch (EOFException eof) {<a name="line.408"></a> +<span class="sourceLineNo">409</span> // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)<a name="line.409"></a> +<span class="sourceLineNo">410</span> if (originalPosition < 0) {<a name="line.410"></a> +<span class="sourceLineNo">411</span> if (LOG.isTraceEnabled()) {<a name="line.411"></a> +<span class="sourceLineNo">412</span> LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);<a name="line.412"></a> +<span class="sourceLineNo">413</span> }<a name="line.413"></a> +<span class="sourceLineNo">414</span> throw eof;<a name="line.414"></a> +<span class="sourceLineNo">415</span> }<a name="line.415"></a> +<span class="sourceLineNo">416</span> // Else restore our position to original location in hope that next time through we will<a name="line.416"></a> +<span class="sourceLineNo">417</span> // read successfully.<a name="line.417"></a> +<span class="sourceLineNo">418</span> if (LOG.isTraceEnabled()) {<a name="line.418"></a> +<span class="sourceLineNo">419</span> LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);<a name="line.419"></a> +<span class="sourceLineNo">420</span> }<a name="line.420"></a> +<span class="sourceLineNo">421</span> seekOnFs(originalPosition);<a name="line.421"></a> +<span class="sourceLineNo">422</span> return false;<a name="line.422"></a> +<span class="sourceLineNo">423</span> }<a name="line.423"></a> +<span class="sourceLineNo">424</span> return true;<a name="line.424"></a> +<span class="sourceLineNo">425</span> }<a name="line.425"></a> +<span class="sourceLineNo">426</span> }<a name="line.426"></a> +<span class="sourceLineNo">427</span><a name="line.427"></a> +<span class="sourceLineNo">428</span> private IOException extractHiddenEof(Exception ex) {<a name="line.428"></a> +<span class="sourceLineNo">429</span> // There are two problems we are dealing with here. Hadoop stream throws generic exception<a name="line.429"></a> +<span class="sourceLineNo">430</span> // for EOF, not EOFException; and scanner further hides it inside RuntimeException.<a name="line.430"></a> +<span class="sourceLineNo">431</span> IOException ioEx = null;<a name="line.431"></a> +<span class="sourceLineNo">432</span> if (ex instanceof EOFException) {<a name="line.432"></a> +<span class="sourceLineNo">433</span> return (EOFException)ex;<a name="line.433"></a> +<span class="sourceLineNo">434</span> } else if (ex instanceof IOException) {<a name="line.434"></a> +<span class="sourceLineNo">435</span> ioEx = (IOException)ex;<a name="line.435"></a> +<span class="sourceLineNo">436</span> } else if (ex instanceof RuntimeException<a name="line.436"></a> +<span class="sourceLineNo">437</span> && ex.getCause() != null && ex.getCause() instanceof IOException) {<a name="line.437"></a> +<span class="sourceLineNo">438</span> ioEx = (IOException)ex.getCause();<a name="line.438"></a> +<span class="sourceLineNo">439</span> }<a name="line.439"></a> +<span class="sourceLineNo">440</span> if (ioEx != null) {<a name="line.440"></a> +<span class="sourceLineNo">441</span> if (ioEx.getMessage().contains("EOF")) return ioEx;<a name="line.441"></a> +<span class="sourceLineNo">442</span> return null;<a name="line.442"></a> +<span class="sourceLineNo">443</span> }<a name="line.443"></a> +<span class="sourceLineNo">444</span> return null;<a name="line.444"></a> +<span class="sourceLineNo">445</span> }<a name="line.445"></a> +<span class="sourceLineNo">446</span><a name="line.446"></a> +<span class="sourceLineNo">447</span> @Override<a name="line.447"></a> +<span class="sourceLineNo">448</span> protected void seekOnFs(long pos) throws IOException {<a name="line.448"></a> +<span class="sourceLineNo">449</span> this.inputStream.seek(pos);<a name="line.449"></a> +<span class="sourceLineNo">450</span> }<a name="line.450"></a> +<span class="sourceLineNo">451</span>}<a name="line.451"></a>
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/e11cf2cb/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrResult.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrResult.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrResult.html index fdc5a8a..62e604e 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrResult.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.WALHdrResult.html @@ -320,141 +320,143 @@ <span class="sourceLineNo">312</span> this.cellDecoder = codec.getDecoder(this.inputStream);<a name="line.312"></a> <span class="sourceLineNo">313</span> if (this.hasCompression) {<a name="line.313"></a> <span class="sourceLineNo">314</span> this.byteStringUncompressor = codec.getByteStringUncompressor();<a name="line.314"></a> -<span class="sourceLineNo">315</span> }<a name="line.315"></a> -<span class="sourceLineNo">316</span> }<a name="line.316"></a> -<span class="sourceLineNo">317</span><a name="line.317"></a> -<span class="sourceLineNo">318</span> @Override<a name="line.318"></a> -<span class="sourceLineNo">319</span> protected boolean hasCompression() {<a name="line.319"></a> -<span class="sourceLineNo">320</span> return this.hasCompression;<a name="line.320"></a> -<span class="sourceLineNo">321</span> }<a name="line.321"></a> -<span class="sourceLineNo">322</span><a name="line.322"></a> -<span class="sourceLineNo">323</span> @Override<a name="line.323"></a> -<span class="sourceLineNo">324</span> protected boolean hasTagCompression() {<a name="line.324"></a> -<span class="sourceLineNo">325</span> return this.hasTagCompression;<a name="line.325"></a> -<span class="sourceLineNo">326</span> }<a name="line.326"></a> -<span class="sourceLineNo">327</span><a name="line.327"></a> -<span class="sourceLineNo">328</span> @Override<a name="line.328"></a> -<span class="sourceLineNo">329</span> protected boolean readNext(Entry entry) throws IOException {<a name="line.329"></a> -<span class="sourceLineNo">330</span> while (true) {<a name="line.330"></a> -<span class="sourceLineNo">331</span> // OriginalPosition might be < 0 on local fs; if so, it is useless to us.<a name="line.331"></a> -<span class="sourceLineNo">332</span> long originalPosition = this.inputStream.getPos();<a name="line.332"></a> -<span class="sourceLineNo">333</span> if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {<a name="line.333"></a> -<span class="sourceLineNo">334</span> if (LOG.isTraceEnabled()) {<a name="line.334"></a> -<span class="sourceLineNo">335</span> LOG.trace("Reached end of expected edits area at offset " + originalPosition);<a name="line.335"></a> -<span class="sourceLineNo">336</span> }<a name="line.336"></a> -<span class="sourceLineNo">337</span> return false;<a name="line.337"></a> -<span class="sourceLineNo">338</span> }<a name="line.338"></a> -<span class="sourceLineNo">339</span> WALKey.Builder builder = WALKey.newBuilder();<a name="line.339"></a> -<span class="sourceLineNo">340</span> long size = 0;<a name="line.340"></a> -<span class="sourceLineNo">341</span> try {<a name="line.341"></a> -<span class="sourceLineNo">342</span> long available = -1;<a name="line.342"></a> -<span class="sourceLineNo">343</span> try {<a name="line.343"></a> -<span class="sourceLineNo">344</span> int firstByte = this.inputStream.read();<a name="line.344"></a> -<span class="sourceLineNo">345</span> if (firstByte == -1) {<a name="line.345"></a> -<span class="sourceLineNo">346</span> throw new EOFException("First byte is negative at offset " + originalPosition);<a name="line.346"></a> -<span class="sourceLineNo">347</span> }<a name="line.347"></a> -<span class="sourceLineNo">348</span> size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);<a name="line.348"></a> -<span class="sourceLineNo">349</span> // available may be < 0 on local fs for instance. If so, can't depend on it.<a name="line.349"></a> -<span class="sourceLineNo">350</span> available = this.inputStream.available();<a name="line.350"></a> -<span class="sourceLineNo">351</span> if (available > 0 && available < size) {<a name="line.351"></a> -<span class="sourceLineNo">352</span> throw new EOFException("Available stream not enough for edit, " +<a name="line.352"></a> -<span class="sourceLineNo">353</span> "inputStream.available()= " + this.inputStream.available() + ", " +<a name="line.353"></a> -<span class="sourceLineNo">354</span> "entry size= " + size + " at offset = " + this.inputStream.getPos());<a name="line.354"></a> -<span class="sourceLineNo">355</span> }<a name="line.355"></a> -<span class="sourceLineNo">356</span> ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),<a name="line.356"></a> -<span class="sourceLineNo">357</span> (int)size);<a name="line.357"></a> -<span class="sourceLineNo">358</span> } catch (InvalidProtocolBufferException ipbe) {<a name="line.358"></a> -<span class="sourceLineNo">359</span> throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +<a name="line.359"></a> -<span class="sourceLineNo">360</span> originalPosition + ", currentPosition=" + this.inputStream.getPos() +<a name="line.360"></a> -<span class="sourceLineNo">361</span> ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);<a name="line.361"></a> -<span class="sourceLineNo">362</span> }<a name="line.362"></a> -<span class="sourceLineNo">363</span> if (!builder.isInitialized()) {<a name="line.363"></a> -<span class="sourceLineNo">364</span> // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.<a name="line.364"></a> -<span class="sourceLineNo">365</span> // If we can get the KV count, we could, theoretically, try to get next record.<a name="line.365"></a> -<span class="sourceLineNo">366</span> throw new EOFException("Partial PB while reading WAL, " +<a name="line.366"></a> -<span class="sourceLineNo">367</span> "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());<a name="line.367"></a> -<span class="sourceLineNo">368</span> }<a name="line.368"></a> -<span class="sourceLineNo">369</span> WALKey walKey = builder.build();<a name="line.369"></a> -<span class="sourceLineNo">370</span> entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);<a name="line.370"></a> -<span class="sourceLineNo">371</span> if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {<a name="line.371"></a> -<span class="sourceLineNo">372</span> if (LOG.isTraceEnabled()) {<a name="line.372"></a> -<span class="sourceLineNo">373</span> LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +<a name="line.373"></a> -<span class="sourceLineNo">374</span> this.inputStream.getPos());<a name="line.374"></a> -<span class="sourceLineNo">375</span> }<a name="line.375"></a> -<span class="sourceLineNo">376</span> continue;<a name="line.376"></a> -<span class="sourceLineNo">377</span> }<a name="line.377"></a> -<span class="sourceLineNo">378</span> int expectedCells = walKey.getFollowingKvCount();<a name="line.378"></a> -<span class="sourceLineNo">379</span> long posBefore = this.inputStream.getPos();<a name="line.379"></a> -<span class="sourceLineNo">380</span> try {<a name="line.380"></a> -<span class="sourceLineNo">381</span> int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);<a name="line.381"></a> -<span class="sourceLineNo">382</span> if (expectedCells != actualCells) {<a name="line.382"></a> -<span class="sourceLineNo">383</span> throw new EOFException("Only read " + actualCells); // other info added in catch<a name="line.383"></a> -<span class="sourceLineNo">384</span> }<a name="line.384"></a> -<span class="sourceLineNo">385</span> } catch (Exception ex) {<a name="line.385"></a> -<span class="sourceLineNo">386</span> String posAfterStr = "<unknown>";<a name="line.386"></a> -<span class="sourceLineNo">387</span> try {<a name="line.387"></a> -<span class="sourceLineNo">388</span> posAfterStr = this.inputStream.getPos() + "";<a name="line.388"></a> -<span class="sourceLineNo">389</span> } catch (Throwable t) {<a name="line.389"></a> -<span class="sourceLineNo">390</span> if (LOG.isTraceEnabled()) {<a name="line.390"></a> -<span class="sourceLineNo">391</span> LOG.trace("Error getting pos for error message - ignoring", t);<a name="line.391"></a> -<span class="sourceLineNo">392</span> }<a name="line.392"></a> -<span class="sourceLineNo">393</span> }<a name="line.393"></a> -<span class="sourceLineNo">394</span> String message = " while reading " + expectedCells + " WAL KVs; started reading at "<a name="line.394"></a> -<span class="sourceLineNo">395</span> + posBefore + " and read up to " + posAfterStr;<a name="line.395"></a> -<span class="sourceLineNo">396</span> IOException realEofEx = extractHiddenEof(ex);<a name="line.396"></a> -<span class="sourceLineNo">397</span> throw (EOFException) new EOFException("EOF " + message).<a name="line.397"></a> -<span class="sourceLineNo">398</span> initCause(realEofEx != null ? realEofEx : ex);<a name="line.398"></a> -<span class="sourceLineNo">399</span> }<a name="line.399"></a> -<span class="sourceLineNo">400</span> if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {<a name="line.400"></a> -<span class="sourceLineNo">401</span> LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path<a name="line.401"></a> -<span class="sourceLineNo">402</span> + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "<a name="line.402"></a> -<span class="sourceLineNo">403</span> + this.walEditsStopOffset);<a name="line.403"></a> -<span class="sourceLineNo">404</span> throw new EOFException("Read WALTrailer while reading WALEdits");<a name="line.404"></a> -<span class="sourceLineNo">405</span> }<a name="line.405"></a> -<span class="sourceLineNo">406</span> } catch (EOFException eof) {<a name="line.406"></a> -<span class="sourceLineNo">407</span> // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)<a name="line.407"></a> -<span class="sourceLineNo">408</span> if (originalPosition < 0) {<a name="line.408"></a> -<span class="sourceLineNo">409</span> if (LOG.isTraceEnabled()) {<a name="line.409"></a> -<span class="sourceLineNo">410</span> LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);<a name="line.410"></a> -<span class="sourceLineNo">411</span> }<a name="line.411"></a> -<span class="sourceLineNo">412</span> throw eof;<a name="line.412"></a> -<span class="sourceLineNo">413</span> }<a name="line.413"></a> -<span class="sourceLineNo">414</span> // Else restore our position to original location in hope that next time through we will<a name="line.414"></a> -<span class="sourceLineNo">415</span> // read successfully.<a name="line.415"></a> -<span class="sourceLineNo">416</span> if (LOG.isTraceEnabled()) {<a name="line.416"></a> -<span class="sourceLineNo">417</span> LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);<a name="line.417"></a> -<span class="sourceLineNo">418</span> }<a name="line.418"></a> -<span class="sourceLineNo">419</span> seekOnFs(originalPosition);<a name="line.419"></a> -<span class="sourceLineNo">420</span> return false;<a name="line.420"></a> -<span class="sourceLineNo">421</span> }<a name="line.421"></a> -<span class="sourceLineNo">422</span> return true;<a name="line.422"></a> -<span class="sourceLineNo">423</span> }<a name="line.423"></a> -<span class="sourceLineNo">424</span> }<a name="line.424"></a> -<span class="sourceLineNo">425</span><a name="line.425"></a> -<span class="sourceLineNo">426</span> private IOException extractHiddenEof(Exception ex) {<a name="line.426"></a> -<span class="sourceLineNo">427</span> // There are two problems we are dealing with here. Hadoop stream throws generic exception<a name="line.427"></a> -<span class="sourceLineNo">428</span> // for EOF, not EOFException; and scanner further hides it inside RuntimeException.<a name="line.428"></a> -<span class="sourceLineNo">429</span> IOException ioEx = null;<a name="line.429"></a> -<span class="sourceLineNo">430</span> if (ex instanceof EOFException) {<a name="line.430"></a> -<span class="sourceLineNo">431</span> return (EOFException)ex;<a name="line.431"></a> -<span class="sourceLineNo">432</span> } else if (ex instanceof IOException) {<a name="line.432"></a> -<span class="sourceLineNo">433</span> ioEx = (IOException)ex;<a name="line.433"></a> -<span class="sourceLineNo">434</span> } else if (ex instanceof RuntimeException<a name="line.434"></a> -<span class="sourceLineNo">435</span> && ex.getCause() != null && ex.getCause() instanceof IOException) {<a name="line.435"></a> -<span class="sourceLineNo">436</span> ioEx = (IOException)ex.getCause();<a name="line.436"></a> -<span class="sourceLineNo">437</span> }<a name="line.437"></a> -<span class="sourceLineNo">438</span> if (ioEx != null) {<a name="line.438"></a> -<span class="sourceLineNo">439</span> if (ioEx.getMessage().contains("EOF")) return ioEx;<a name="line.439"></a> -<span class="sourceLineNo">440</span> return null;<a name="line.440"></a> -<span class="sourceLineNo">441</span> }<a name="line.441"></a> -<span class="sourceLineNo">442</span> return null;<a name="line.442"></a> -<span class="sourceLineNo">443</span> }<a name="line.443"></a> -<span class="sourceLineNo">444</span><a name="line.444"></a> -<span class="sourceLineNo">445</span> @Override<a name="line.445"></a> -<span class="sourceLineNo">446</span> protected void seekOnFs(long pos) throws IOException {<a name="line.446"></a> -<span class="sourceLineNo">447</span> this.inputStream.seek(pos);<a name="line.447"></a> -<span class="sourceLineNo">448</span> }<a name="line.448"></a> -<span class="sourceLineNo">449</span>}<a name="line.449"></a> +<span class="sourceLineNo">315</span> } else {<a name="line.315"></a> +<span class="sourceLineNo">316</span> this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();<a name="line.316"></a> +<span class="sourceLineNo">317</span> }<a name="line.317"></a> +<span class="sourceLineNo">318</span> }<a name="line.318"></a> +<span class="sourceLineNo">319</span><a name="line.319"></a> +<span class="sourceLineNo">320</span> @Override<a name="line.320"></a> +<span class="sourceLineNo">321</span> protected boolean hasCompression() {<a name="line.321"></a> +<span class="sourceLineNo">322</span> return this.hasCompression;<a name="line.322"></a> +<span class="sourceLineNo">323</span> }<a name="line.323"></a> +<span class="sourceLineNo">324</span><a name="line.324"></a> +<span class="sourceLineNo">325</span> @Override<a name="line.325"></a> +<span class="sourceLineNo">326</span> protected boolean hasTagCompression() {<a name="line.326"></a> +<span class="sourceLineNo">327</span> return this.hasTagCompression;<a name="line.327"></a> +<span class="sourceLineNo">328</span> }<a name="line.328"></a> +<span class="sourceLineNo">329</span><a name="line.329"></a> +<span class="sourceLineNo">330</span> @Override<a name="line.330"></a> +<span class="sourceLineNo">331</span> protected boolean readNext(Entry entry) throws IOException {<a name="line.331"></a> +<span class="sourceLineNo">332</span> while (true) {<a name="line.332"></a> +<span class="sourceLineNo">333</span> // OriginalPosition might be < 0 on local fs; if so, it is useless to us.<a name="line.333"></a> +<span class="sourceLineNo">334</span> long originalPosition = this.inputStream.getPos();<a name="line.334"></a> +<span class="sourceLineNo">335</span> if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {<a name="line.335"></a> +<span class="sourceLineNo">336</span> if (LOG.isTraceEnabled()) {<a name="line.336"></a> +<span class="sourceLineNo">337</span> LOG.trace("Reached end of expected edits area at offset " + originalPosition);<a name="line.337"></a> +<span class="sourceLineNo">338</span> }<a name="line.338"></a> +<span class="sourceLineNo">339</span> return false;<a name="line.339"></a> +<span class="sourceLineNo">340</span> }<a name="line.340"></a> +<span class="sourceLineNo">341</span> WALKey.Builder builder = WALKey.newBuilder();<a name="line.341"></a> +<span class="sourceLineNo">342</span> long size = 0;<a name="line.342"></a> +<span class="sourceLineNo">343</span> try {<a name="line.343"></a> +<span class="sourceLineNo">344</span> long available = -1;<a name="line.344"></a> +<span class="sourceLineNo">345</span> try {<a name="line.345"></a> +<span class="sourceLineNo">346</span> int firstByte = this.inputStream.read();<a name="line.346"></a> +<span class="sourceLineNo">347</span> if (firstByte == -1) {<a name="line.347"></a> +<span class="sourceLineNo">348</span> throw new EOFException("First byte is negative at offset " + originalPosition);<a name="line.348"></a> +<span class="sourceLineNo">349</span> }<a name="line.349"></a> +<span class="sourceLineNo">350</span> size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);<a name="line.350"></a> +<span class="sourceLineNo">351</span> // available may be < 0 on local fs for instance. If so, can't depend on it.<a name="line.351"></a> +<span class="sourceLineNo">352</span> available = this.inputStream.available();<a name="line.352"></a> +<span class="sourceLineNo">353</span> if (available > 0 && available < size) {<a name="line.353"></a> +<span class="sourceLineNo">354</span> throw new EOFException("Available stream not enough for edit, " +<a name="line.354"></a> +<span class="sourceLineNo">355</span> "inputStream.available()= " + this.inputStream.available() + ", " +<a name="line.355"></a> +<span class="sourceLineNo">356</span> "entry size= " + size + " at offset = " + this.inputStream.getPos());<a name="line.356"></a> +<span class="sourceLineNo">357</span> }<a name="line.357"></a> +<span class="sourceLineNo">358</span> ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),<a name="line.358"></a> +<span class="sourceLineNo">359</span> (int)size);<a name="line.359"></a> +<span class="sourceLineNo">360</span> } catch (InvalidProtocolBufferException ipbe) {<a name="line.360"></a> +<span class="sourceLineNo">361</span> throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +<a name="line.361"></a> +<span class="sourceLineNo">362</span> originalPosition + ", currentPosition=" + this.inputStream.getPos() +<a name="line.362"></a> +<span class="sourceLineNo">363</span> ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);<a name="line.363"></a> +<span class="sourceLineNo">364</span> }<a name="line.364"></a> +<span class="sourceLineNo">365</span> if (!builder.isInitialized()) {<a name="line.365"></a> +<span class="sourceLineNo">366</span> // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.<a name="line.366"></a> +<span class="sourceLineNo">367</span> // If we can get the KV count, we could, theoretically, try to get next record.<a name="line.367"></a> +<span class="sourceLineNo">368</span> throw new EOFException("Partial PB while reading WAL, " +<a name="line.368"></a> +<span class="sourceLineNo">369</span> "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());<a name="line.369"></a> +<span class="sourceLineNo">370</span> }<a name="line.370"></a> +<span class="sourceLineNo">371</span> WALKey walKey = builder.build();<a name="line.371"></a> +<span class="sourceLineNo">372</span> entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);<a name="line.372"></a> +<span class="sourceLineNo">373</span> if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {<a name="line.373"></a> +<span class="sourceLineNo">374</span> if (LOG.isTraceEnabled()) {<a name="line.374"></a> +<span class="sourceLineNo">375</span> LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +<a name="line.375"></a> +<span class="sourceLineNo">376</span> this.inputStream.getPos());<a name="line.376"></a> +<span class="sourceLineNo">377</span> }<a name="line.377"></a> +<span class="sourceLineNo">378</span> continue;<a name="line.378"></a> +<span class="sourceLineNo">379</span> }<a name="line.379"></a> +<span class="sourceLineNo">380</span> int expectedCells = walKey.getFollowingKvCount();<a name="line.380"></a> +<span class="sourceLineNo">381</span> long posBefore = this.inputStream.getPos();<a name="line.381"></a> +<span class="sourceLineNo">382</span> try {<a name="line.382"></a> +<span class="sourceLineNo">383</span> int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);<a name="line.383"></a> +<span class="sourceLineNo">384</span> if (expectedCells != actualCells) {<a name="line.384"></a> +<span class="sourceLineNo">385</span> throw new EOFException("Only read " + actualCells); // other info added in catch<a name="line.385"></a> +<span class="sourceLineNo">386</span> }<a name="line.386"></a> +<span class="sourceLineNo">387</span> } catch (Exception ex) {<a name="line.387"></a> +<span class="sourceLineNo">388</span> String posAfterStr = "<unknown>";<a name="line.388"></a> +<span class="sourceLineNo">389</span> try {<a name="line.389"></a> +<span class="sourceLineNo">390</span> posAfterStr = this.inputStream.getPos() + "";<a name="line.390"></a> +<span class="sourceLineNo">391</span> } catch (Throwable t) {<a name="line.391"></a> +<span class="sourceLineNo">392</span> if (LOG.isTraceEnabled()) {<a name="line.392"></a> +<span class="sourceLineNo">393</span> LOG.trace("Error getting pos for error message - ignoring", t);<a name="line.393"></a> +<span class="sourceLineNo">394</span> }<a name="line.394"></a> +<span class="sourceLineNo">395</span> }<a name="line.395"></a> +<span class="sourceLineNo">396</span> String message = " while reading " + expectedCells + " WAL KVs; started reading at "<a name="line.396"></a> +<span class="sourceLineNo">397</span> + posBefore + " and read up to " + posAfterStr;<a name="line.397"></a> +<span class="sourceLineNo">398</span> IOException realEofEx = extractHiddenEof(ex);<a name="line.398"></a> +<span class="sourceLineNo">399</span> throw (EOFException) new EOFException("EOF " + message).<a name="line.399"></a> +<span class="sourceLineNo">400</span> initCause(realEofEx != null ? realEofEx : ex);<a name="line.400"></a> +<span class="sourceLineNo">401</span> }<a name="line.401"></a> +<span class="sourceLineNo">402</span> if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {<a name="line.402"></a> +<span class="sourceLineNo">403</span> LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path<a name="line.403"></a> +<span class="sourceLineNo">404</span> + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "<a name="line.404"></a> +<span class="sourceLineNo">405</span> + this.walEditsStopOffset);<a name="line.405"></a> +<span class="sourceLineNo">406</span> throw new EOFException("Read WALTrailer while reading WALEdits");<a name="line.406"></a> +<span class="sourceLineNo">407</span> }<a name="line.407"></a> +<span class="sourceLineNo">408</span> } catch (EOFException eof) {<a name="line.408"></a> +<span class="sourceLineNo">409</span> // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)<a name="line.409"></a> +<span class="sourceLineNo">410</span> if (originalPosition < 0) {<a name="line.410"></a> +<span class="sourceLineNo">411</span> if (LOG.isTraceEnabled()) {<a name="line.411"></a> +<span class="sourceLineNo">412</span> LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);<a name="line.412"></a> +<span class="sourceLineNo">413</span> }<a name="line.413"></a> +<span class="sourceLineNo">414</span> throw eof;<a name="line.414"></a> +<span class="sourceLineNo">415</span> }<a name="line.415"></a> +<span class="sourceLineNo">416</span> // Else restore our position to original location in hope that next time through we will<a name="line.416"></a> +<span class="sourceLineNo">417</span> // read successfully.<a name="line.417"></a> +<span class="sourceLineNo">418</span> if (LOG.isTraceEnabled()) {<a name="line.418"></a> +<span class="sourceLineNo">419</span> LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);<a name="line.419"></a> +<span class="sourceLineNo">420</span> }<a name="line.420"></a> +<span class="sourceLineNo">421</span> seekOnFs(originalPosition);<a name="line.421"></a> +<span class="sourceLineNo">422</span> return false;<a name="line.422"></a> +<span class="sourceLineNo">423</span> }<a name="line.423"></a> +<span class="sourceLineNo">424</span> return true;<a name="line.424"></a> +<span class="sourceLineNo">425</span> }<a name="line.425"></a> +<span class="sourceLineNo">426</span> }<a name="line.426"></a> +<span class="sourceLineNo">427</span><a name="line.427"></a> +<span class="sourceLineNo">428</span> private IOException extractHiddenEof(Exception ex) {<a name="line.428"></a> +<span class="sourceLineNo">429</span> // There are two problems we are dealing with here. Hadoop stream throws generic exception<a name="line.429"></a> +<span class="sourceLineNo">430</span> // for EOF, not EOFException; and scanner further hides it inside RuntimeException.<a name="line.430"></a> +<span class="sourceLineNo">431</span> IOException ioEx = null;<a name="line.431"></a> +<span class="sourceLineNo">432</span> if (ex instanceof EOFException) {<a name="line.432"></a> +<span class="sourceLineNo">433</span> return (EOFException)ex;<a name="line.433"></a> +<span class="sourceLineNo">434</span> } else if (ex instanceof IOException) {<a name="line.434"></a> +<span class="sourceLineNo">435</span> ioEx = (IOException)ex;<a name="line.435"></a> +<span class="sourceLineNo">436</span> } else if (ex instanceof RuntimeException<a name="line.436"></a> +<span class="sourceLineNo">437</span> && ex.getCause() != null && ex.getCause() instanceof IOException) {<a name="line.437"></a> +<span class="sourceLineNo">438</span> ioEx = (IOException)ex.getCause();<a name="line.438"></a> +<span class="sourceLineNo">439</span> }<a name="line.439"></a> +<span class="sourceLineNo">440</span> if (ioEx != null) {<a name="line.440"></a> +<span class="sourceLineNo">441</span> if (ioEx.getMessage().contains("EOF")) return ioEx;<a name="line.441"></a> +<span class="sourceLineNo">442</span> return null;<a name="line.442"></a> +<span class="sourceLineNo">443</span> }<a name="line.443"></a> +<span class="sourceLineNo">444</span> return null;<a name="line.444"></a> +<span class="sourceLineNo">445</span> }<a name="line.445"></a> +<span class="sourceLineNo">446</span><a name="line.446"></a> +<span class="sourceLineNo">447</span> @Override<a name="line.447"></a> +<span class="sourceLineNo">448</span> protected void seekOnFs(long pos) throws IOException {<a name="line.448"></a> +<span class="sourceLineNo">449</span> this.inputStream.seek(pos);<a name="line.449"></a> +<span class="sourceLineNo">450</span> }<a name="line.450"></a> +<span class="sourceLineNo">451</span>}<a name="line.451"></a> http://git-wip-us.apache.org/repos/asf/hbase-site/blob/e11cf2cb/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.html index fdc5a8a..62e604e 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.html @@ -320,141 +320,143 @@ <span class="sourceLineNo">312</span> this.cellDecoder = codec.getDecoder(this.inputStream);<a name="line.312"></a> <span class="sourceLineNo">313</span> if (this.hasCompression) {<a name="line.313"></a> <span class="sourceLineNo">314</span> this.byteStringUncompressor = codec.getByteStringUncompressor();<a name="line.314"></a> -<span class="sourceLineNo">315</span> }<a name="line.315"></a> -<span class="sourceLineNo">316</span> }<a name="line.316"></a> -<span class="sourceLineNo">317</span><a name="line.317"></a> -<span class="sourceLineNo">318</span> @Override<a name="line.318"></a> -<span class="sourceLineNo">319</span> protected boolean hasCompression() {<a name="line.319"></a> -<span class="sourceLineNo">320</span> return this.hasCompression;<a name="line.320"></a> -<span class="sourceLineNo">321</span> }<a name="line.321"></a> -<span class="sourceLineNo">322</span><a name="line.322"></a> -<span class="sourceLineNo">323</span> @Override<a name="line.323"></a> -<span class="sourceLineNo">324</span> protected boolean hasTagCompression() {<a name="line.324"></a> -<span class="sourceLineNo">325</span> return this.hasTagCompression;<a name="line.325"></a> -<span class="sourceLineNo">326</span> }<a name="line.326"></a> -<span class="sourceLineNo">327</span><a name="line.327"></a> -<span class="sourceLineNo">328</span> @Override<a name="line.328"></a> -<span class="sourceLineNo">329</span> protected boolean readNext(Entry entry) throws IOException {<a name="line.329"></a> -<span class="sourceLineNo">330</span> while (true) {<a name="line.330"></a> -<span class="sourceLineNo">331</span> // OriginalPosition might be < 0 on local fs; if so, it is useless to us.<a name="line.331"></a> -<span class="sourceLineNo">332</span> long originalPosition = this.inputStream.getPos();<a name="line.332"></a> -<span class="sourceLineNo">333</span> if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {<a name="line.333"></a> -<span class="sourceLineNo">334</span> if (LOG.isTraceEnabled()) {<a name="line.334"></a> -<span class="sourceLineNo">335</span> LOG.trace("Reached end of expected edits area at offset " + originalPosition);<a name="line.335"></a> -<span class="sourceLineNo">336</span> }<a name="line.336"></a> -<span class="sourceLineNo">337</span> return false;<a name="line.337"></a> -<span class="sourceLineNo">338</span> }<a name="line.338"></a> -<span class="sourceLineNo">339</span> WALKey.Builder builder = WALKey.newBuilder();<a name="line.339"></a> -<span class="sourceLineNo">340</span> long size = 0;<a name="line.340"></a> -<span class="sourceLineNo">341</span> try {<a name="line.341"></a> -<span class="sourceLineNo">342</span> long available = -1;<a name="line.342"></a> -<span class="sourceLineNo">343</span> try {<a name="line.343"></a> -<span class="sourceLineNo">344</span> int firstByte = this.inputStream.read();<a name="line.344"></a> -<span class="sourceLineNo">345</span> if (firstByte == -1) {<a name="line.345"></a> -<span class="sourceLineNo">346</span> throw new EOFException("First byte is negative at offset " + originalPosition);<a name="line.346"></a> -<span class="sourceLineNo">347</span> }<a name="line.347"></a> -<span class="sourceLineNo">348</span> size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);<a name="line.348"></a> -<span class="sourceLineNo">349</span> // available may be < 0 on local fs for instance. If so, can't depend on it.<a name="line.349"></a> -<span class="sourceLineNo">350</span> available = this.inputStream.available();<a name="line.350"></a> -<span class="sourceLineNo">351</span> if (available > 0 && available < size) {<a name="line.351"></a> -<span class="sourceLineNo">352</span> throw new EOFException("Available stream not enough for edit, " +<a name="line.352"></a> -<span class="sourceLineNo">353</span> "inputStream.available()= " + this.inputStream.available() + ", " +<a name="line.353"></a> -<span class="sourceLineNo">354</span> "entry size= " + size + " at offset = " + this.inputStream.getPos());<a name="line.354"></a> -<span class="sourceLineNo">355</span> }<a name="line.355"></a> -<span class="sourceLineNo">356</span> ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),<a name="line.356"></a> -<span class="sourceLineNo">357</span> (int)size);<a name="line.357"></a> -<span class="sourceLineNo">358</span> } catch (InvalidProtocolBufferException ipbe) {<a name="line.358"></a> -<span class="sourceLineNo">359</span> throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +<a name="line.359"></a> -<span class="sourceLineNo">360</span> originalPosition + ", currentPosition=" + this.inputStream.getPos() +<a name="line.360"></a> -<span class="sourceLineNo">361</span> ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);<a name="line.361"></a> -<span class="sourceLineNo">362</span> }<a name="line.362"></a> -<span class="sourceLineNo">363</span> if (!builder.isInitialized()) {<a name="line.363"></a> -<span class="sourceLineNo">364</span> // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.<a name="line.364"></a> -<span class="sourceLineNo">365</span> // If we can get the KV count, we could, theoretically, try to get next record.<a name="line.365"></a> -<span class="sourceLineNo">366</span> throw new EOFException("Partial PB while reading WAL, " +<a name="line.366"></a> -<span class="sourceLineNo">367</span> "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());<a name="line.367"></a> -<span class="sourceLineNo">368</span> }<a name="line.368"></a> -<span class="sourceLineNo">369</span> WALKey walKey = builder.build();<a name="line.369"></a> -<span class="sourceLineNo">370</span> entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);<a name="line.370"></a> -<span class="sourceLineNo">371</span> if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {<a name="line.371"></a> -<span class="sourceLineNo">372</span> if (LOG.isTraceEnabled()) {<a name="line.372"></a> -<span class="sourceLineNo">373</span> LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +<a name="line.373"></a> -<span class="sourceLineNo">374</span> this.inputStream.getPos());<a name="line.374"></a> -<span class="sourceLineNo">375</span> }<a name="line.375"></a> -<span class="sourceLineNo">376</span> continue;<a name="line.376"></a> -<span class="sourceLineNo">377</span> }<a name="line.377"></a> -<span class="sourceLineNo">378</span> int expectedCells = walKey.getFollowingKvCount();<a name="line.378"></a> -<span class="sourceLineNo">379</span> long posBefore = this.inputStream.getPos();<a name="line.379"></a> -<span class="sourceLineNo">380</span> try {<a name="line.380"></a> -<span class="sourceLineNo">381</span> int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);<a name="line.381"></a> -<span class="sourceLineNo">382</span> if (expectedCells != actualCells) {<a name="line.382"></a> -<span class="sourceLineNo">383</span> throw new EOFException("Only read " + actualCells); // other info added in catch<a name="line.383"></a> -<span class="sourceLineNo">384</span> }<a name="line.384"></a> -<span class="sourceLineNo">385</span> } catch (Exception ex) {<a name="line.385"></a> -<span class="sourceLineNo">386</span> String posAfterStr = "<unknown>";<a name="line.386"></a> -<span class="sourceLineNo">387</span> try {<a name="line.387"></a> -<span class="sourceLineNo">388</span> posAfterStr = this.inputStream.getPos() + "";<a name="line.388"></a> -<span class="sourceLineNo">389</span> } catch (Throwable t) {<a name="line.389"></a> -<span class="sourceLineNo">390</span> if (LOG.isTraceEnabled()) {<a name="line.390"></a> -<span class="sourceLineNo">391</span> LOG.trace("Error getting pos for error message - ignoring", t);<a name="line.391"></a> -<span class="sourceLineNo">392</span> }<a name="line.392"></a> -<span class="sourceLineNo">393</span> }<a name="line.393"></a> -<span class="sourceLineNo">394</span> String message = " while reading " + expectedCells + " WAL KVs; started reading at "<a name="line.394"></a> -<span class="sourceLineNo">395</span> + posBefore + " and read up to " + posAfterStr;<a name="line.395"></a> -<span class="sourceLineNo">396</span> IOException realEofEx = extractHiddenEof(ex);<a name="line.396"></a> -<span class="sourceLineNo">397</span> throw (EOFException) new EOFException("EOF " + message).<a name="line.397"></a> -<span class="sourceLineNo">398</span> initCause(realEofEx != null ? realEofEx : ex);<a name="line.398"></a> -<span class="sourceLineNo">399</span> }<a name="line.399"></a> -<span class="sourceLineNo">400</span> if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {<a name="line.400"></a> -<span class="sourceLineNo">401</span> LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path<a name="line.401"></a> -<span class="sourceLineNo">402</span> + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "<a name="line.402"></a> -<span class="sourceLineNo">403</span> + this.walEditsStopOffset);<a name="line.403"></a> -<span class="sourceLineNo">404</span> throw new EOFException("Read WALTrailer while reading WALEdits");<a name="line.404"></a> -<span class="sourceLineNo">405</span> }<a name="line.405"></a> -<span class="sourceLineNo">406</span> } catch (EOFException eof) {<a name="line.406"></a> -<span class="sourceLineNo">407</span> // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)<a name="line.407"></a> -<span class="sourceLineNo">408</span> if (originalPosition < 0) {<a name="line.408"></a> -<span class="sourceLineNo">409</span> if (LOG.isTraceEnabled()) {<a name="line.409"></a> -<span class="sourceLineNo">410</span> LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);<a name="line.410"></a> -<span class="sourceLineNo">411</span> }<a name="line.411"></a> -<span class="sourceLineNo">412</span> throw eof;<a name="line.412"></a> -<span class="sourceLineNo">413</span> }<a name="line.413"></a> -<span class="sourceLineNo">414</span> // Else restore our position to original location in hope that next time through we will<a name="line.414"></a> -<span class="sourceLineNo">415</span> // read successfully.<a name="line.415"></a> -<span class="sourceLineNo">416</span> if (LOG.isTraceEnabled()) {<a name="line.416"></a> -<span class="sourceLineNo">417</span> LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);<a name="line.417"></a> -<span class="sourceLineNo">418</span> }<a name="line.418"></a> -<span class="sourceLineNo">419</span> seekOnFs(originalPosition);<a name="line.419"></a> -<span class="sourceLineNo">420</span> return false;<a name="line.420"></a> -<span class="sourceLineNo">421</span> }<a name="line.421"></a> -<span class="sourceLineNo">422</span> return true;<a name="line.422"></a> -<span class="sourceLineNo">423</span> }<a name="line.423"></a> -<span class="sourceLineNo">424</span> }<a name="line.424"></a> -<span class="sourceLineNo">425</span><a name="line.425"></a> -<span class="sourceLineNo">426</span> private IOException extractHiddenEof(Exception ex) {<a name="line.426"></a> -<span class="sourceLineNo">427</span> // There are two problems we are dealing with here. Hadoop stream throws generic exception<a name="line.427"></a> -<span class="sourceLineNo">428</span> // for EOF, not EOFException; and scanner further hides it inside RuntimeException.<a name="line.428"></a> -<span class="sourceLineNo">429</span> IOException ioEx = null;<a name="line.429"></a> -<span class="sourceLineNo">430</span> if (ex instanceof EOFException) {<a name="line.430"></a> -<span class="sourceLineNo">431</span> return (EOFException)ex;<a name="line.431"></a> -<span class="sourceLineNo">432</span> } else if (ex instanceof IOException) {<a name="line.432"></a> -<span class="sourceLineNo">433</span> ioEx = (IOException)ex;<a name="line.433"></a> -<span class="sourceLineNo">434</span> } else if (ex instanceof RuntimeException<a name="line.434"></a> -<span class="sourceLineNo">435</span> && ex.getCause() != null && ex.getCause() instanceof IOException) {<a name="line.435"></a> -<span class="sourceLineNo">436</span> ioEx = (IOException)ex.getCause();<a name="line.436"></a> -<span class="sourceLineNo">437</span> }<a name="line.437"></a> -<span class="sourceLineNo">438</span> if (ioEx != null) {<a name="line.438"></a> -<span class="sourceLineNo">439</span> if (ioEx.getMessage().contains("EOF")) return ioEx;<a name="line.439"></a> -<span class="sourceLineNo">440</span> return null;<a name="line.440"></a> -<span class="sourceLineNo">441</span> }<a name="line.441"></a> -<span class="sourceLineNo">442</span> return null;<a name="line.442"></a> -<span class="sourceLineNo">443</span> }<a name="line.443"></a> -<span class="sourceLineNo">444</span><a name="line.444"></a> -<span class="sourceLineNo">445</span> @Override<a name="line.445"></a> -<span class="sourceLineNo">446</span> protected void seekOnFs(long pos) throws IOException {<a name="line.446"></a> -<span class="sourceLineNo">447</span> this.inputStream.seek(pos);<a name="line.447"></a> -<span class="sourceLineNo">448</span> }<a name="line.448"></a> -<span class="sourceLineNo">449</span>}<a name="line.449"></a> +<span class="sourceLineNo">315</span> } else {<a name="line.315"></a> +<span class="sourceLineNo">316</span> this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();<a name="line.316"></a> +<span class="sourceLineNo">317</span> }<a name="line.317"></a> +<span class="sourceLineNo">318</span> }<a name="line.318"></a> +<span class="sourceLineNo">319</span><a name="line.319"></a> +<span class="sourceLineNo">320</span> @Override<a name="line.320"></a> +<span class="sourceLineNo">321</span> protected boolean hasCompression() {<a name="line.321"></a> +<span class="sourceLineNo">322</span> return this.hasCompression;<a name="line.322"></a> +<span class="sourceLineNo">323</span> }<a name="line.323"></a> +<span class="sourceLineNo">324</span><a name="line.324"></a> +<span class="sourceLineNo">325</span> @Override<a name="line.325"></a> +<span class="sourceLineNo">326</span> protected boolean hasTagCompression() {<a name="line.326"></a> +<span class="sourceLineNo">327</span> return this.hasTagCompression;<a name="line.327"></a> +<span class="sourceLineNo">328</span> }<a name="line.328"></a> +<span class="sourceLineNo">329</span><a name="line.329"></a> +<span class="sourceLineNo">330</span> @Override<a name="line.330"></a> +<span class="sourceLineNo">331</span> protected boolean readNext(Entry entry) throws IOException {<a name="line.331"></a> +<span class="sourceLineNo">332</span> while (true) {<a name="line.332"></a> +<span class="sourceLineNo">333</span> // OriginalPosition might be < 0 on local fs; if so, it is useless to us.<a name="line.333"></a> +<span class="sourceLineNo">334</span> long originalPosition = this.inputStream.getPos();<a name="line.334"></a> +<span class="sourceLineNo">335</span> if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {<a name="line.335"></a> +<span class="sourceLineNo">336</span> if (LOG.isTraceEnabled()) {<a name="line.336"></a> +<span class="sourceLineNo">337</span> LOG.trace("Reached end of expected edits area at offset " + originalPosition);<a name="line.337"></a> +<span class="sourceLineNo">338</span> }<a name="line.338"></a> +<span class="sourceLineNo">339</span> return false;<a name="line.339"></a> +<span class="sourceLineNo">340</span> }<a name="line.340"></a> +<span class="sourceLineNo">341</span> WALKey.Builder builder = WALKey.newBuilder();<a name="line.341"></a> +<span class="sourceLineNo">342</span> long size = 0;<a name="line.342"></a> +<span class="sourceLineNo">343</span> try {<a name="line.343"></a> +<span class="sourceLineNo">344</span> long available = -1;<a name="line.344"></a> +<span class="sourceLineNo">345</span> try {<a name="line.345"></a> +<span class="sourceLineNo">346</span> int firstByte = this.inputStream.read();<a name="line.346"></a> +<span class="sourceLineNo">347</span> if (firstByte == -1) {<a name="line.347"></a> +<span class="sourceLineNo">348</span> throw new EOFException("First byte is negative at offset " + originalPosition);<a name="line.348"></a> +<span class="sourceLineNo">349</span> }<a name="line.349"></a> +<span class="sourceLineNo">350</span> size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);<a name="line.350"></a> +<span class="sourceLineNo">351</span> // available may be < 0 on local fs for instance. If so, can't depend on it.<a name="line.351"></a> +<span class="sourceLineNo">352</span> available = this.inputStream.available();<a name="line.352"></a> +<span class="sourceLineNo">353</span> if (available > 0 && available < size) {<a name="line.353"></a> +<span class="sourceLineNo">354</span> throw new EOFException("Available stream not enough for edit, " +<a name="line.354"></a> +<span class="sourceLineNo">355</span> "inputStream.available()= " + this.inputStream.available() + ", " +<a name="line.355"></a> +<span class="sourceLineNo">356</span> "entry size= " + size + " at offset = " + this.inputStream.getPos());<a name="line.356"></a> +<span class="sourceLineNo">357</span> }<a name="line.357"></a> +<span class="sourceLineNo">358</span> ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),<a name="line.358"></a> +<span class="sourceLineNo">359</span> (int)size);<a name="line.359"></a> +<span class="sourceLineNo">360</span> } catch (InvalidProtocolBufferException ipbe) {<a name="line.360"></a> +<span class="sourceLineNo">361</span> throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +<a name="line.361"></a> +<span class="sourceLineNo">362</span> originalPosition + ", currentPosition=" + this.inputStream.getPos() +<a name="line.362"></a> +<span class="sourceLineNo">363</span> ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);<a name="line.363"></a> +<span class="sourceLineNo">364</span> }<a name="line.364"></a> +<span class="sourceLineNo">365</span> if (!builder.isInitialized()) {<a name="line.365"></a> +<span class="sourceLineNo">366</span> // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.<a name="line.366"></a> +<span class="sourceLineNo">367</span> // If we can get the KV count, we could, theoretically, try to get next record.<a name="line.367"></a> +<span class="sourceLineNo">368</span> throw new EOFException("Partial PB while reading WAL, " +<a name="line.368"></a> +<span class="sourceLineNo">369</span> "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());<a name="line.369"></a> +<span class="sourceLineNo">370</span> }<a name="line.370"></a> +<span class="sourceLineNo">371</span> WALKey walKey = builder.build();<a name="line.371"></a> +<span class="sourceLineNo">372</span> entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);<a name="line.372"></a> +<span class="sourceLineNo">373</span> if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {<a name="line.373"></a> +<span class="sourceLineNo">374</span> if (LOG.isTraceEnabled()) {<a name="line.374"></a> +<span class="sourceLineNo">375</span> LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +<a name="line.375"></a> +<span class="sourceLineNo">376</span> this.inputStream.getPos());<a name="line.376"></a> +<span class="sourceLineNo">377</span> }<a name="line.377"></a> +<span class="sourceLineNo">378</span> continue;<a name="line.378"></a> +<span class="sourceLineNo">379</span> }<a name="line.379"></a> +<span class="sourceLineNo">380</span> int expectedCells = walKey.getFollowingKvCount();<a name="line.380"></a> +<span class="sourceLineNo">381</span> long posBefore = this.inputStream.getPos();<a name="line.381"></a> +<span class="sourceLineNo">382</span> try {<a name="line.382"></a> +<span class="sourceLineNo">383</span> int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);<a name="line.383"></a> +<span class="sourceLineNo">384</span> if (expectedCells != actualCells) {<a name="line.384"></a> +<span class="sourceLineNo">385</span> throw new EOFException("Only read " + actualCells); // other info added in catch<a name="line.385"></a> +<span class="sourceLineNo">386</span> }<a name="line.386"></a> +<span class="sourceLineNo">387</span> } catch (Exception ex) {<a name="line.387"></a> +<span class="sourceLineNo">388</span> String posAfterStr = "<unknown>";<a name="line.388"></a> +<span class="sourceLineNo">389</span> try {<a name="line.389"></a> +<span class="sourceLineNo">390</span> posAfterStr = this.inputStream.getPos() + "";<a name="line.390"></a> +<span class="sourceLineNo">391</span> } catch (Throwable t) {<a name="line.391"></a> +<span class="sourceLineNo">392</span> if (LOG.isTraceEnabled()) {<a name="line.392"></a> +<span class="sourceLineNo">393</span> LOG.trace("Error getting pos for error message - ignoring", t);<a name="line.393"></a> +<span class="sourceLineNo">394</span> }<a name="line.394"></a> +<span class="sourceLineNo">395</span> }<a name="line.395"></a> +<span class="sourceLineNo">396</span> String message = " while reading " + expectedCells + " WAL KVs; started reading at "<a name="line.396"></a> +<span class="sourceLineNo">397</span> + posBefore + " and read up to " + posAfterStr;<a name="line.397"></a> +<span class="sourceLineNo">398</span> IOException realEofEx = extractHiddenEof(ex);<a name="line.398"></a> +<span class="sourceLineNo">399</span> throw (EOFException) new EOFException("EOF " + message).<a name="line.399"></a> +<span class="sourceLineNo">400</span> initCause(realEofEx != null ? realEofEx : ex);<a name="line.400"></a> +<span class="sourceLineNo">401</span> }<a name="line.401"></a> +<span class="sourceLineNo">402</span> if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {<a name="line.402"></a> +<span class="sourceLineNo">403</span> LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path<a name="line.403"></a> +<span class="sourceLineNo">404</span> + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "<a name="line.404"></a> +<span class="sourceLineNo">405</span> + this.walEditsStopOffset);<a name="line.405"></a> +<span class="sourceLineNo">406</span> throw new EOFException("Read WALTrailer while reading WALEdits");<a name="line.406"></a> +<span class="sourceLineNo">407</span> }<a name="line.407"></a> +<span class="sourceLineNo">408</span> } catch (EOFException eof) {<a name="line.408"></a> +<span class="sourceLineNo">409</span> // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)<a name="line.409"></a> +<span class="sourceLineNo">410</span> if (originalPosition < 0) {<a name="line.410"></a> +<span class="sourceLineNo">411</span> if (LOG.isTraceEnabled()) {<a name="line.411"></a> +<span class="sourceLineNo">412</span> LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof);<a name="line.412"></a> +<span class="sourceLineNo">413</span> }<a name="line.413"></a> +<span class="sourceLineNo">414</span> throw eof;<a name="line.414"></a> +<span class="sourceLineNo">415</span> }<a name="line.415"></a> +<span class="sourceLineNo">416</span> // Else restore our position to original location in hope that next time through we will<a name="line.416"></a> +<span class="sourceLineNo">417</span> // read successfully.<a name="line.417"></a> +<span class="sourceLineNo">418</span> if (LOG.isTraceEnabled()) {<a name="line.418"></a> +<span class="sourceLineNo">419</span> LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof);<a name="line.419"></a> +<span class="sourceLineNo">420</span> }<a name="line.420"></a> +<span class="sourceLineNo">421</span> seekOnFs(originalPosition);<a name="line.421"></a> +<span class="sourceLineNo">422</span> return false;<a name="line.422"></a> +<span class="sourceLineNo">423</span> }<a name="line.423"></a> +<span class="sourceLineNo">424</span> return true;<a name="line.424"></a> +<span class="sourceLineNo">425</span> }<a name="line.425"></a> +<span class="sourceLineNo">426</span> }<a name="line.426"></a> +<span class="sourceLineNo">427</span><a name="line.427"></a> +<span class="sourceLineNo">428</span> private IOException extractHiddenEof(Exception ex) {<a name="line.428"></a> +<span class="sourceLineNo">429</span> // There are two problems we are dealing with here. Hadoop stream throws generic exception<a name="line.429"></a> +<span class="sourceLineNo">430</span> // for EOF, not EOFException; and scanner further hides it inside RuntimeException.<a name="line.430"></a> +<span class="sourceLineNo">431</span> IOException ioEx = null;<a name="line.431"></a> +<span class="sourceLineNo">432</span> if (ex instanceof EOFException) {<a name="line.432"></a> +<span class="sourceLineNo">433</span> return (EOFException)ex;<a name="line.433"></a> +<span class="sourceLineNo">434</span> } else if (ex instanceof IOException) {<a name="line.434"></a> +<span class="sourceLineNo">435</span> ioEx = (IOException)ex;<a name="line.435"></a> +<span class="sourceLineNo">436</span> } else if (ex instanceof RuntimeException<a name="line.436"></a> +<span class="sourceLineNo">437</span> && ex.getCause() != null && ex.getCause() instanceof IOException) {<a name="line.437"></a> +<span class="sourceLineNo">438</span> ioEx = (IOException)ex.getCause();<a name="line.438"></a> +<span class="sourceLineNo">439</span> }<a name="line.439"></a> +<span class="sourceLineNo">440</span> if (ioEx != null) {<a name="line.440"></a> +<span class="sourceLineNo">441</span> if (ioEx.getMessage().contains("EOF")) return ioEx;<a name="line.441"></a> +<span class="sourceLineNo">442</span> return null;<a name="line.442"></a> +<span class="sourceLineNo">443</span> }<a name="line.443"></a> +<span class="sourceLineNo">444</span> return null;<a name="line.444"></a> +<span class="sourceLineNo">445</span> }<a name="line.445"></a> +<span class="sourceLineNo">446</span><a name="line.446"></a> +<span class="sourceLineNo">447</span> @Override<a name="line.447"></a> +<span class="sourceLineNo">448</span> protected void seekOnFs(long pos) throws IOException {<a name="line.448"></a> +<span class="sourceLineNo">449</span> this.inputStream.seek(pos);<a name="line.449"></a> +<span class="sourceLineNo">450</span> }<a name="line.450"></a> +<span class="sourceLineNo">451</span>}<a name="line.451"></a>