Repository: activemq Updated Branches: refs/heads/activemq-5.14.x b8fc78ec6 -> 7e3f344ea
[AMQ-6606] - reset next batch so it does not get reused after error and refine fix to sync write batches b/c async locations will already be in the index (cherry picked from commit 21ae1ef2e658a14f89bde79a83a32af340fb351a) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7e3f344e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7e3f344e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7e3f344e Branch: refs/heads/activemq-5.14.x Commit: 7e3f344ea7708650bac8452fa585e6125ebe083e Parents: b8fc78e Author: gtully <gary.tu...@gmail.com> Authored: Tue Mar 7 12:28:58 2017 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Mar 7 08:07:33 2017 -0500 ---------------------------------------------------------------------- .../kahadb/disk/journal/DataFileAppender.java | 21 +++++- .../DataFileAppenderNoSpaceNoBatchTest.java | 68 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7e3f344e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index 25c4e28..3153a50 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -355,7 +355,11 @@ class DataFileAppender implements FileAppender { synchronized (enqueueMutex) { running = false; signalError(wb, error); - signalError(nextWriteBatch, error); + if (nextWriteBatch != null) { + signalError(nextWriteBatch, error); + nextWriteBatch = null; + enqueueMutex.notifyAll(); + } } } finally { try { @@ -402,12 +406,23 @@ class DataFileAppender implements FileAppender { if (wb != null) { if (t instanceof IOException) { wb.exception.set((IOException) t); - // revert batch increment such that next write is contiguous - wb.dataFile.decrementLength(wb.size); + // revert sync batch increment such that next write is contiguous + if (syncBatch(wb.writes)) { + wb.dataFile.decrementLength(wb.size); + } } else { wb.exception.set(IOExceptionSupport.create(t)); } signalDone(wb); } } + + // async writes will already be in the index so reuse is not an option + private boolean syncBatch(LinkedNodeList<Journal.WriteCommand> writes) { + Journal.WriteCommand write = writes.getHead(); + while (write != null && write.sync) { + write = write.getNext(); + } + return write == null; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/7e3f344e/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java index aa6df3f..ec68d13 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java @@ -21,17 +21,27 @@ import org.apache.activemq.util.RecoverableRandomAccessFile; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class DataFileAppenderNoSpaceNoBatchTest { + + private static final Logger LOG = LoggerFactory.getLogger(DataFileAppenderNoSpaceNoBatchTest.class); + @Rule public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); @@ -77,4 +87,62 @@ public class DataFileAppenderNoSpaceNoBatchTest { assertEquals("offset is reused", seekPositions.get(0), seekPositions.get(1)); } + + + @Test(timeout = 10000) + public void testNoSpaceNextWriteSameBatchAsync() throws Exception { + final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>()); + + final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) { + public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException { + + return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") { + + public void seek(long pos) throws IOException { + seekPositions.add(pos); + } + + public void write(byte[] bytes, int offset, int len) throws IOException { + if (seekPositions.size() == 2) { + throw new IOException("No space on device: " + seekPositions.size()); + } + } + }; + }; + }; + + underTest = new DataFileAppender(new Journal() { + @Override + public DataFile getCurrentDataFile(int capacity) throws IOException { + return currentDataFile; + }; + + @Override + public int getWriteBatchSize() { + // force multiple async batches + return 4*1024; + } + }); + + final ByteSequence byteSequence = new ByteSequence(new byte[1024]); + + ConcurrentLinkedQueue<Location> locations = new ConcurrentLinkedQueue<Location>(); + HashSet<CountDownLatch> latches = new HashSet<CountDownLatch>(); + for (int i = 0; i <= 20; i++) { + Location location = underTest.storeItem(byteSequence, (byte) 1, false); + locations.add(location); + latches.add(location.getLatch()); + } + + for (CountDownLatch latch: latches) { + assertTrue("write complete", latch.await(5, TimeUnit.SECONDS)); + } + + LOG.info("Latches count: " + latches.size()); + LOG.info("Seeks: " + seekPositions); + + assertTrue("got more than on latch: " + latches.size(), latches.size() > 1); + assertTrue("got seeks: " + seekPositions, seekPositions.size() > 2); + assertEquals("no duplicates: " + seekPositions, seekPositions.size(), new HashSet<Long>(seekPositions).size()); + } }