Repository: activemq Updated Branches: refs/heads/master 32ca1f53f -> 4cddd2c01
http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java new file mode 100644 index 0000000..c08fa00 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/PercentLimitUsage.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usage; + + +public abstract class PercentLimitUsage <T extends Usage> extends Usage<T> { + + protected int percentLimit = 0; + + /** + * @param parent + * @param name + * @param portion + */ + public PercentLimitUsage(T parent, String name, float portion) { + super(parent, name, portion); + } + + public void setPercentLimit(int percentLimit) { + usageLock.writeLock().lock(); + try { + this.percentLimit = percentLimit; + updateLimitBasedOnPercent(); + } finally { + usageLock.writeLock().unlock(); + } + } + + public int getPercentLimit() { + usageLock.readLock().lock(); + try { + return percentLimit; + } finally { + usageLock.readLock().unlock(); + } + } + + protected abstract void updateLimitBasedOnPercent(); +} http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java index b808388..4646551 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.usage; +import java.io.File; + import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.StoreUtil; /** * Used to keep track of how much of something is being used so that a @@ -26,7 +29,7 @@ import org.apache.activemq.store.PersistenceAdapter; * @org.apache.xbean.XBean * */ -public class StoreUsage extends Usage<StoreUsage> { +public class StoreUsage extends PercentLimitUsage<StoreUsage> { private PersistenceAdapter store; @@ -37,11 +40,13 @@ public class StoreUsage extends Usage<StoreUsage> { public StoreUsage(String name, PersistenceAdapter store) { super(null, name, 1.0f); this.store = store; + updateLimitBasedOnPercent(); } public StoreUsage(StoreUsage parent, String name) { super(parent, name, 1.0f); this.store = parent.store; + updateLimitBasedOnPercent(); } @Override @@ -57,7 +62,12 @@ public class StoreUsage extends Usage<StoreUsage> { public void setStore(PersistenceAdapter store) { this.store = store; - onLimitChange(); + if (percentLimit > 0 && store != null) { + //will trigger onLimitChange + updateLimitBasedOnPercent(); + } else { + onLimitChange(); + } } @Override @@ -81,4 +91,21 @@ public class StoreUsage extends Usage<StoreUsage> { return super.waitForSpace(timeout, highWaterMark); } + + @Override + protected void updateLimitBasedOnPercent() { + usageLock.writeLock().lock(); + try { + + if (percentLimit > 0 && store != null) { + File dir = StoreUtil.findParentDirectory(store.getDirectory()); + + if (dir != null) { + this.setLimit(dir.getTotalSpace() * percentLimit / 100); + } + } + } finally { + usageLock.writeLock().unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java index 7ce91b3..f068dbe 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java @@ -17,17 +17,20 @@ package org.apache.activemq.usage; +import java.io.File; + import org.apache.activemq.store.PListStore; +import org.apache.activemq.util.StoreUtil; /** * Used to keep track of how much of something is being used so that a * productive working set usage can be controlled. Main use case is manage * memory usage. - * + * * @org.apache.xbean.XBean - * + * */ -public class TempUsage extends Usage<TempUsage> { +public class TempUsage extends PercentLimitUsage<TempUsage> { private PListStore store; @@ -38,11 +41,13 @@ public class TempUsage extends Usage<TempUsage> { public TempUsage(String name, PListStore store) { super(null, name, 1.0f); this.store = store; + updateLimitBasedOnPercent(); } public TempUsage(TempUsage parent, String name) { super(parent, name, 1.0f); this.store = parent.store; + updateLimitBasedOnPercent(); } @Override @@ -59,6 +64,27 @@ public class TempUsage extends Usage<TempUsage> { public void setStore(PListStore store) { this.store = store; - onLimitChange(); + if (percentLimit > 0 && store != null) { + //will trigger onLimitChange + updateLimitBasedOnPercent(); + } else { + onLimitChange(); + } + } + + @Override + protected void updateLimitBasedOnPercent() { + usageLock.writeLock().lock(); + try { + if (percentLimit > 0 && store != null) { + File dir = StoreUtil.findParentDirectory(store.getDirectory()); + + if (dir != null) { + this.setLimit(dir.getTotalSpace() * percentLimit / 100); + } + } + } finally { + usageLock.writeLock().unlock(); + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-broker/src/main/java/org/apache/activemq/util/StoreUtil.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/StoreUtil.java b/activemq-broker/src/main/java/org/apache/activemq/util/StoreUtil.java new file mode 100644 index 0000000..77aafa0 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/util/StoreUtil.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.File; + +public class StoreUtil { + + /** + * Utility method to help find the root directory of the store + * + * @param dir + * @return + */ + public static File findParentDirectory(File dir) { + if (dir != null) { + String dirPath = dir.getAbsolutePath(); + if (!dir.isAbsolute()) { + dir = new File(dirPath); + } + + while (dir != null && !dir.isDirectory()) { + dir = dir.getParentFile(); + } + } + return dir; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java new file mode 100644 index 0000000..5eb830c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usage; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.StoreUtil; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * This test is for AMQ-5393 and will check that schedulePeriodForDiskLimitCheck + * properly schedules a task that will update disk limits if the amount of usable disk space drops + * because another process uses up disk space. + * + */ +public class PercentDiskUsageLimitTest { + protected static final Logger LOG = LoggerFactory + .getLogger(PercentDiskUsageLimitTest.class); + + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + + private BrokerService broker; + private PersistenceAdapter adapter; + private TempUsage tempUsage; + private StoreUsage storeUsage; + private File storeDir; + + @Before + public void setUpBroker() throws Exception { + broker = new BrokerService(); + broker.setPersistent(true); + broker.setDataDirectoryFile(dataFileDir.getRoot()); + broker.setDeleteAllMessagesOnStartup(true); + adapter = broker.getPersistenceAdapter(); + + FileUtils.forceMkdir(adapter.getDirectory()); + FileUtils.forceMkdir(broker.getTempDataStore().getDirectory()); + storeDir = StoreUtil.findParentDirectory(adapter.getDirectory()); + + final SystemUsage systemUsage = broker.getSystemUsage(); + tempUsage = systemUsage.getTempUsage(); + storeUsage = systemUsage.getStoreUsage(); + } + + protected void startBroker() throws Exception { + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + + /** + * + */ + @Test(timeout=30000) + public void testDiskLimit() throws Exception { + int freePercent = getFreePercentage(); + + if (freePercent >= 2) { + int maxUsage = freePercent / 2; + + //Set max usage to less than free space so we know that all space can be allocated + storeUsage.setPercentLimit(maxUsage); + tempUsage.setPercentLimit(maxUsage); + startBroker(); + + long diskLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + + //assert the disk limit is the same as the max usage percent * total available space + //within 1 mb + assertEquals(diskLimit, storeDir.getTotalSpace() * maxUsage / 100, 1000000); + } + } + + @Test(timeout=30000) + public void testDiskLimitOverMaxFree() throws Exception { + int freePercent = getFreePercentage(); + + if (freePercent > 1) { + storeUsage.setPercentLimit(freePercent + 1); + startBroker(); + + long diskLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + + //assert the disk limit is the same as the usable space + //within 1 mb + assertEquals(diskLimit, storeDir.getUsableSpace(), 1000000); + } + } + + @Test(timeout=30000) + public void testDiskLimitOver100Percent() throws Exception { + int freePercent = getFreePercentage(); + + if (freePercent > 1) { + storeUsage.setPercentLimit(110); + startBroker(); + + long diskLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + + //assert the disk limit is the same as the available space + //within 1 mb + assertEquals(diskLimit, storeDir.getUsableSpace(), 1000000); + } + } + + protected int getFreePercentage() { + File storeDir = StoreUtil.findParentDirectory(adapter.getDirectory()); + return (int) (((double)storeDir.getUsableSpace() / storeDir.getTotalSpace()) * 100); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java index bd3687b..48c6c4b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PeriodicDiskUsageLimitTest.java @@ -22,19 +22,21 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.util.Random; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.usage.StoreUsage; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.usage.TempUsage; +import org.apache.activemq.util.StoreUtil; import org.apache.activemq.util.Wait; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,21 +51,31 @@ public class PeriodicDiskUsageLimitTest { protected static final Logger LOG = LoggerFactory .getLogger(PeriodicDiskUsageLimitTest.class); - File dataFileDir = new File("target/test-amq-5393/datadb"); - File testfile = new File("target/test-amq-5393/testfile"); + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + File testfile; private BrokerService broker; private PersistenceAdapter adapter; private TempUsage tempUsage; private StoreUsage storeUsage; + protected URI brokerConnectURI; @Before public void setUpBroker() throws Exception { broker = new BrokerService(); broker.setPersistent(true); - broker.setDataDirectoryFile(dataFileDir); + testfile = dataFileDir.newFile(); + broker.setDataDirectoryFile(dataFileDir.getRoot()); broker.setDeleteAllMessagesOnStartup(true); adapter = broker.getPersistenceAdapter(); + TransportConnector connector = broker + .addConnector(new TransportConnector()); + connector.setUri(new URI("tcp://0.0.0.0:8000")); + connector.setName("tcp"); + + brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri(); + FileUtils.deleteQuietly(testfile); FileUtils.forceMkdir(adapter.getDirectory()); FileUtils.forceMkdir(broker.getTempDataStore().getDirectory()); @@ -82,8 +94,6 @@ public class PeriodicDiskUsageLimitTest { public void stopBroker() throws Exception { broker.stop(); broker.waitUntilStopped(); - FileUtils.deleteQuietly(testfile); - FileUtils.deleteQuietly(dataFileDir); } /** @@ -102,8 +112,48 @@ public class PeriodicDiskUsageLimitTest { final long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); final long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); - //write a 1 meg file to the file system - writeTestFile(1024 * 1024); + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); + + //Assert that the usage limits have been decreased because some free space was used + //up by a file + assertTrue("Store Usage should ramp down.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getStoreUsage().getLimit() < originalDisk; + } + })); + + assertTrue("Temp Usage should ramp down.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getTempUsage().getLimit() < originalTmp; + } + })); + } + + /** + * This test will show that if a file is written to take away free space, and + * if the usage limit is now less than the store size plus remaining free space, then + * the usage limits will adjust lower. Then test that size regrows when file is deleted. + */ + @Test(timeout=30000) + public void testDiskUsageAdjustLowerAndHigherUsingPercent() throws Exception { + //set the limit to max space so that if a file is added to eat up free space then + //the broker should adjust the usage limit..add 5% above free space + tempUsage.setPercentLimit(getFreePercentage(broker.getTempDataStore().getDirectory()) + 5); + storeUsage.setPercentLimit(getFreePercentage(adapter.getDirectory()) + 5); + + //set threshold to 1 megabyte + broker.setDiskUsageCheckRegrowThreshold(1024 * 1024); + broker.setSchedulePeriodForDiskUsageCheck(4000); + startBroker(); + + final long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); + final long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); + + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); //Assert that the usage limits have been decreased because some free space was used //up by a file @@ -120,6 +170,27 @@ public class PeriodicDiskUsageLimitTest { return broker.getSystemUsage().getTempUsage().getLimit() < originalTmp; } })); + + //get the limits and then delete the test file to free up space + final long storeLimit = broker.getSystemUsage().getStoreUsage().getLimit(); + final long tmpLimit = broker.getSystemUsage().getTempUsage().getLimit(); + FileUtils.deleteQuietly(testfile); + + //regrow + assertTrue("Store Usage should ramp up.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getStoreUsage().getLimit() > storeLimit; + } + })); + + //regrow + assertTrue("Temp Usage should ramp up.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getTempUsage().getLimit() > tmpLimit; + } + })); } /** @@ -144,6 +215,28 @@ public class PeriodicDiskUsageLimitTest { } /** + * This test shows that the usage limits will not change if the + * schedulePeriodForDiskLimitCheck property is not set because no task will run + */ + @Test(timeout=30000) + public void testDiskLimitCheckNotSetUsingPercent() throws Exception { + tempUsage.setPercentLimit(getFreePercentage(broker.getTempDataStore().getDirectory()) + 5); + storeUsage.setPercentLimit(getFreePercentage(adapter.getDirectory()) + 5); + startBroker(); + + long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); + long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); + + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); + Thread.sleep(3000); + + //assert that the usage limits have not changed because a task should not have run + assertEquals(originalDisk, broker.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit()); + } + + /** * This test will show that if a file is written to take away free space, but * if the limit is greater than the store size and the remaining free space, then * the usage limits will not adjust. @@ -169,6 +262,38 @@ public class PeriodicDiskUsageLimitTest { assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit()); } + /** + * This test will show that if a file is written to take away free space, but + * if the limit is greater than the store size and the remaining free space, then + * the usage limits will not adjust. + */ + @Test(timeout=30000) + public void testDiskUsageStaySameUsingPercent() throws Exception { + //set a limit lower than max available space and set the period to 5 seconds + //only run if at least 4 percent disk space free + int tempFreePercent = getFreePercentage(broker.getTempDataStore().getDirectory()); + int freePercent = getFreePercentage(adapter.getDirectory()); + if (freePercent >= 4 && tempFreePercent >= 4) { + tempUsage.setPercentLimit(freePercent / 2); + storeUsage.setPercentLimit(tempFreePercent / 2); + + broker.setSchedulePeriodForDiskUsageCheck(2000); + startBroker(); + + long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit(); + long originalTmp = broker.getSystemUsage().getTempUsage().getLimit(); + + //write a 2 meg file to the file system + writeTestFile(2 * 1024 * 1024); + Thread.sleep(5000); + + //Assert that the usage limits have not changed because writing a 2 meg file + //did not decrease the the free space below the already set limit + assertEquals(originalDisk, broker.getSystemUsage().getStoreUsage().getLimit()); + assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit()); + } + } + protected void setLimitMaxSpace() { //Configure store limits to be the max usable space on startup tempUsage.setLimit(broker.getTempDataStore().getDirectory().getUsableSpace()); @@ -179,7 +304,14 @@ public class PeriodicDiskUsageLimitTest { final byte[] data = new byte[size]; final Random rng = new Random(); rng.nextBytes(data); - IOUtils.write(data, new FileOutputStream(testfile)); + try(FileOutputStream stream = new FileOutputStream(testfile)) { + IOUtils.write(data, stream); + } + } + + protected int getFreePercentage(File directory) { + File storeDir = StoreUtil.findParentDirectory(directory); + return (int) (((double)storeDir.getUsableSpace() / storeDir.getTotalSpace()) * 100); } }