Author: rajdavies
Date: Wed May 14 12:36:11 2008
New Revision: 656378
URL: http://svn.apache.org/viewvc?rev=656378&view=rev
Log:
Grow the HashIndex bins as required
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
Wed May 14 12:36:11 2008
@@ -260,6 +260,17 @@
int getIndexPageSize();
/**
+ * set the meximum bin size
+ */
+ void setMaxBinSize(int size);
+
+ /**
+ * @return the maximum bin size
+ * @return
+ */
+ int getMaxBinSize();
+
+ /**
* @return the Index MBean
*/
IndexMBean getIndexMBean();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
Wed May 14 12:36:11 2008
@@ -58,6 +58,7 @@
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+ private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
public MapContainerImpl(File directory, ContainerId id, IndexItem root,
IndexManager indexManager,
DataManager dataManager, boolean persistentIndex) {
@@ -76,6 +77,7 @@
hashIndex.setNumberOfBins(getIndexBinSize());
hashIndex.setKeySize(getIndexKeySize());
hashIndex.setPageSize(getIndexPageSize());
+ hashIndex.setMaximumCapacity(getMaxBinSize());
this.index = hashIndex;
} catch (IOException e) {
LOG.error("Failed to create HashIndex", e);
@@ -566,6 +568,15 @@
public IndexMBean getIndexMBean() {
return (IndexMBean) index;
}
+
+ public int getMaxBinSize() {
+ return maxBinSize;
+ }
+
+ public void setMaxBinSize(int maxBinSize) {
+ this.maxBinSize = maxBinSize;
+ }
+
public String toString() {
@@ -588,7 +599,5 @@
}
buf.append("}");
return buf.toString();
- }
-
-
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
Wed May 14 12:36:11 2008
@@ -42,6 +42,11 @@
public static final int DEFAULT_PAGE_SIZE;
public static final int DEFAULT_KEY_SIZE;
public static final int DEFAULT_BIN_SIZE;
+ public static final int MAXIMUM_CAPACITY = 16384;
+ /**
+ * The load factor used when none specified in constructor.
+ **/
+ static final float DEFAULT_LOAD_FACTOR;
private static final String NAME_PREFIX = "hash-index-";
private static final Log LOG = LogFactory.getLog(HashIndex.class);
private final String name;
@@ -66,6 +71,9 @@
private int pageCacheSize = 10;
private int size;
private int activeBins;
+ private int threshold;
+ private int maximumCapacity=MAXIMUM_CAPACITY;
+ private float loadFactor=0.75f;
/**
@@ -178,6 +186,48 @@
return false;
}
+ /**
+ * @return the threshold
+ */
+ public int getThreshold() {
+ return threshold;
+ }
+
+ /**
+ * @param threshold the threshold to set
+ */
+ public void setThreshold(int threshold) {
+ this.threshold = threshold;
+ }
+
+ /**
+ * @return the loadFactor
+ */
+ public float getLoadFactor() {
+ return loadFactor;
+ }
+
+ /**
+ * @param loadFactor the loadFactor to set
+ */
+ public void setLoadFactor(float loadFactor) {
+ this.loadFactor = loadFactor;
+ }
+
+ /**
+ * @return the maximumCapacity
+ */
+ public int getMaximumCapacity() {
+ return maximumCapacity;
+ }
+
+ /**
+ * @param maximumCapacity the maximumCapacity to set
+ */
+ public void setMaximumCapacity(int maximumCapacity) {
+ this.maximumCapacity = maximumCapacity;
+ }
+
public synchronized int getSize() {
return size;
}
@@ -193,6 +243,7 @@
capacity <<= 1;
}
this.bins = new HashBin[capacity];
+ threshold = calculateThreashold();
keysPerPage = pageSize / keySize;
dataIn = new DataByteArrayInputStream();
dataOut = new DataByteArrayOutputStream(pageSize);
@@ -229,6 +280,9 @@
if (!getBin(key).put(entry)) {
size++;
}
+ if (size >= threshold) {
+ resize(2*bins.length);
+ }
}
public synchronized StoreEntry get(Object key) throws IOException {
@@ -361,11 +415,18 @@
}
void addToBin(HashPage page) throws IOException {
- HashBin bin = getBin(page.getBinId());
+ int index = page.getBinId();
+ if (index >= numberOfBins) {
+ HashBin[] newBins = new HashBin[index+1];
+ System.arraycopy(this.bins, 0, newBins, 0, this.bins.length);
+ this.bins=newBins;
+ }
+ HashBin bin = getBin(index);
bin.addHashPageInfo(page.getId(), page.getPersistedSize());
}
private HashBin getBin(int index) {
+
HashBin result = bins[index];
if (result == null) {
result = new HashBin(this, index, pageSize / keySize);
@@ -464,6 +525,49 @@
doLoad();
}
+ private void resize(int newCapacity) throws IOException {
+ if (bins.length == getMaximumCapacity()) {
+ threshold = Integer.MAX_VALUE;
+ return;
+ }
+ String backFileName = name + "-REISZE";
+ HashIndex backIndex = new
HashIndex(directory,backFileName,indexManager);
+ backIndex.setKeyMarshaller(keyMarshaller);
+ backIndex.setKeySize(getKeySize());
+ backIndex.setNumberOfBins(newCapacity);
+ backIndex.setPageSize(getPageSize());
+ backIndex.load();
+ File backFile = backIndex.file;
+ long offset = 0;
+ while ((offset + pageSize) <= indexFile.length()) {
+ indexFile.seek(offset);
+ HashPage page = getFullPage(offset);
+ if (page.isActive()) {
+ for (HashEntry entry : page.getEntries()) {
+ backIndex.getBin(entry.getKey()).put(entry);
+ backIndex.size++;
+ }
+ }
+ page=null;
+ offset += pageSize;
+ }
+ backIndex.unload();
+
+ unload();
+ IOHelper.deleteFile(file);
+ IOHelper.copyFile(backFile, file);
+ IOHelper.deleteFile(backFile);
+ setNumberOfBins(newCapacity);
+ bins = new HashBin[newCapacity];
+ threshold = calculateThreashold();
+ openIndexFile();
+ doLoad();
+ }
+
+ private int calculateThreashold() {
+ return (int)(bins.length * 100 * loadFactor);
+ }
+
public String toString() {
String str = "HashIndex"+System.identityHashCode(this)+":
"+file.getName();
@@ -488,5 +592,6 @@
DEFAULT_PAGE_SIZE =
Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
DEFAULT_KEY_SIZE =
Integer.parseInt(System.getProperty("defaultKeySize", "96"));
DEFAULT_BIN_SIZE=
Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
+
DEFAULT_LOAD_FACTOR=Float.parseFloat(System.getProperty("defaultLoadFactor","1.5f"));
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Wed May 14 12:36:11 2008
@@ -118,6 +118,7 @@
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+ private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
private int
maxReferenceFileLength=AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new
ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
private String directoryPath = "";
@@ -685,6 +686,7 @@
adaptor.setIndexBinSize(getIndexBinSize());
adaptor.setIndexKeySize(getIndexKeySize());
adaptor.setIndexPageSize(getIndexPageSize());
+ adaptor.setMaxBinSize(getMaxBinSize());
return adaptor;
}
@@ -833,6 +835,14 @@
public int getIndexPageSize() {
return indexPageSize;
}
+
+ public int getMaxBinSize() {
+ return maxBinSize;
+ }
+
+ public void setMaxBinSize(int maxBinSize) {
+ this.maxBinSize = maxBinSize;
+ }
/**
* When set using XBean, you can use values such as: "20
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=656378&r1=656377&r2=656378&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Wed May 14 12:36:11 2008
@@ -72,6 +72,7 @@
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
+ private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
public KahaReferenceStoreAdapter(AtomicLong size){
@@ -203,6 +204,7 @@
container.setIndexBinSize(getIndexBinSize());
container.setIndexKeySize(getIndexKeySize());
container.setIndexPageSize(getIndexPageSize());
+ container.setMaxBinSize(getIndexBinSize());
container.setKeyMarshaller(new MessageIdMarshaller());
container.setValueMarshaller(new ReferenceRecordMarshaller());
container.load();
@@ -361,4 +363,12 @@
public void setIndexPageSize(int indexPageSize) {
this.indexPageSize = indexPageSize;
}
+
+ public int getMaxBinSize() {
+ return maxBinSize;
+ }
+
+ public void setMaxBinSize(int maxBinSize) {
+ this.maxBinSize = maxBinSize;
+ }
}