GEODE-2398: Updates from review https://reviews.apache.org/r/56506/
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/fb14e9aa Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/fb14e9aa Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/fb14e9aa Branch: refs/heads/feature/GEODE-2449 Commit: fb14e9aab263654ed0176dcc3c9738be1b208a82 Parents: 9b0f165 Author: Ken Howe <kh...@pivotal.io> Authored: Fri Feb 10 16:08:09 2017 -0800 Committer: Ken Howe <kh...@pivotal.io> Committed: Mon Feb 13 13:50:07 2017 -0800 ---------------------------------------------------------------------- .../org/apache/geode/internal/cache/Oplog.java | 49 +++-------- .../geode/internal/cache/OplogFlushTest.java | 89 -------------------- 2 files changed, 13 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/fb14e9aa/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java index 4e426a0..270c833 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java @@ -5189,11 +5189,6 @@ public final class Oplog implements CompactableOplog, Flushable { private static final int MAX_CHANNEL_RETRIES = 5; private final void flush(OplogFile olf, boolean doSync) throws IOException { - int flushed; - int channelBytesWritten; - int numChannelRetries = 0; - int bbStartPos; - long channelStartPos; try { synchronized (this.lock/* olf */) { if (olf.RAFClosed) { @@ -5202,15 +5197,15 @@ public final class Oplog implements CompactableOplog, Flushable { ByteBuffer bb = olf.writeBuf; if (bb != null && bb.position() != 0) { bb.flip(); - flushed = 0; + int flushed = 0; + int numChannelRetries = 0; do { - channelBytesWritten = 0; - bbStartPos = bb.position(); - channelStartPos = olf.channel.position(); + int channelBytesWritten = 0; + final int bbStartPos = bb.position(); + final long channelStartPos = olf.channel.position(); // differentiate between bytes written on this channel.write() iteration and the // total number of bytes written to the channel on this call channelBytesWritten = olf.channel.write(bb); - flushed += channelBytesWritten; // Expect channelBytesWritten and the changes in pp.position() and channel.position() to // be the same. If they are not, then the channel.write() silently failed. The following // retry separates spurious failures from permanent channel failures. @@ -5218,11 +5213,16 @@ public final class Oplog implements CompactableOplog, Flushable { if (numChannelRetries++ < MAX_CHANNEL_RETRIES) { // Reset the ByteBuffer position, but take into account anything that did get // written to the channel - bb.position(bbStartPos + (int) (olf.channel.position() - channelStartPos)); + channelBytesWritten = (int) (olf.channel.position() - channelStartPos); + bb.position(bbStartPos + channelBytesWritten); } else { - throw new IOException("Failed to write Oplog entry to" + olf.f.getName()); + throw new IOException("Failed to write Oplog entry to" + olf.f.getName() + ": " + + "channel.write() returned " + channelBytesWritten + ", " + + "change in channel position = " + (olf.channel.position() - channelStartPos) + + ", " + "change in source buffer position = " + (bb.position() - bbStartPos)); } } + flushed += channelBytesWritten; } while (bb.hasRemaining()); // update bytesFlushed after entire writeBuffer is flushed to fix bug // 41201 @@ -5247,11 +5247,6 @@ public final class Oplog implements CompactableOplog, Flushable { private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException { try { - long channelStartPos; - long expectedWritten; - long flushed; - int numChannelRetries = 0; - boolean retryWrite = false; synchronized (this.lock/* olf */) { if (olf.RAFClosed) { return; @@ -5259,25 +5254,7 @@ public final class Oplog implements CompactableOplog, Flushable { this.bbArray[0] = b1; this.bbArray[1] = b2; b1.flip(); - int b1StartPos = b1.position(); - int b2StartPos = b2.position(); - expectedWritten = b1.limit() - b1StartPos + b2.limit() - b2StartPos; - channelStartPos = olf.channel.position(); - - do { - retryWrite = false; - flushed = olf.channel.write(this.bbArray); - if (flushed != expectedWritten) { - if (numChannelRetries++ < MAX_CHANNEL_RETRIES) { - retryWrite = true; - olf.channel.position(channelStartPos); - b1.position(b1StartPos); - b2.position(b2StartPos); - } else { - throw new IOException("Failed to write Oplog entry to" + olf.f.getName()); - } - } - } while (retryWrite); + long flushed = olf.channel.write(this.bbArray); this.bbArray[0] = null; this.bbArray[1] = null; // update bytesFlushed after entire writeBuffer is flushed to fix bug 41201 http://git-wip-us.apache.org/repos/asf/geode/blob/fb14e9aa/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java index 1d484e4..d24e66d 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java @@ -166,93 +166,4 @@ public class OplogFlushTest extends DiskRegionTestingBase { doChannelFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog */); } - - class FakeChannelWriteBBArray implements Answer<Long> { - - @Override - public Long answer(InvocationOnMock invocation) throws Throwable { - bbArray = ol.bbArray; - return fakeWriteBBArray(ol, bbArray); - } - } - - private long fakeWriteBBArray(Oplog ol, ByteBuffer[] bbA) throws IOException { - if (nFakeChannelWrites > 0) { - for (int i = 0; i < bbA.length; ++i) { - bbA[i].position(bbA[i].limit()); - } - --nFakeChannelWrites; - return 0; - } - doCallRealMethod().when(spyCh).write(bbA); - return spyCh.write(bbA); - } - - private void doChannelBBArrayFlushWithFailures(Oplog[] oplogs, int numFailures) - throws IOException { - nFakeChannelWrites = numFailures; - ol = oplogs[0]; - ch = ol.getFileChannel(); - spyCh = spy(ch); - ol.testSetCrfChannel(spyCh); - - byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; - byte[] entry2 = {16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}; - byte[] entry3 = {32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47}; - - bb1 = ByteBuffer.allocateDirect(entry1.length); - bb2 = ByteBuffer.allocateDirect(entry2.length); - ByteBuffer[] bbArray = ol.bbArray; - try { - // Force channel.write() failures when writing the first entry - doAnswer(new FakeChannelWriteBBArray()).when(spyCh).write(bbArray); - long chStartPos = ol.getFileChannel().position(); - bb1.clear(); - bb1.put(entry1); - bb2.clear(); - bb2.put(entry2); - bb2.flip(); - ol.flush(bb1, bb2); - - // Write the 2nd entry without forced channel failures - nFakeChannelWrites = 0; - bb1.clear(); - bb1.put(entry2); - bb1 = ol.getWriteBuf(); - ol.flushAll(true); - long chEndPos = ol.getFileChannel().position(); - assertEquals("Change in channel position does not equal the size of the data flushed", - entry1.length + entry2.length, chEndPos - chStartPos); - ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length); - ol.getFileChannel().position(chStartPos); - ol.getFileChannel().read(dst); - verifyBB(dst, entry1); - } finally { - region.destroyRegion(); - } - } - - @Test - public void testChannelRecoversFromWriteFailureOfByteBufferArray() throws Exception { - region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); - DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); - assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); - assertNotNull("Unexpected null Oplog", oplogs[0]); - - doChannelBBArrayFlushWithFailures(oplogs, 1 /* write failures */); - } - - @Test - public void testOplogFlushOfByteBufferArrayThrowsIOExceptionWhenNumberOfChannelWriteRetriesExceedsLimit() - throws Exception { - expectedException.expect(instanceOf(IOException.class)); - region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); - DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); - assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); - assertNotNull("Unexpected null Oplog", oplogs[0]); - - doChannelBBArrayFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog */); - } }