Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/344
Change subject: Allow lazy loading for persistent local resources
......................................................................
Allow lazy loading for persistent local resources
- Allow indexes to be accessed by name.
- Allow lazy loading for persistent local resources.
- Caching for local resources.
- Fix server start timing issue.
Change-Id: I48b9260a3280750145f6ddb3783673a299055910
---
M
asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
M
asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
M
asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
M
asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
M asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
M
asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
M asterix-transactions/pom.xml
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
M
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
15 files changed, 435 insertions(+), 301 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/44/344/1
diff --git
a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 6d7f2a4..6bd9fff 100644
---
a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++
b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -92,7 +92,6 @@
private AsterixTransactionProperties txnProperties;
private AsterixFeedProperties feedProperties;
-
private AsterixThreadExecutor threadExecutor;
private DatasetLifecycleManager indexLifecycleManager;
private IFileMapManager fileMapManager;
@@ -136,7 +135,7 @@
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
ILocalResourceRepositoryFactory
persistentLocalResourceRepositoryFactory = new
PersistentLocalResourceRepositoryFactory(
- ioManager);
+ ioManager, ncApplicationContext.getNodeId());
localResourceRepository = (PersistentLocalResourceRepository)
persistentLocalResourceRepositoryFactory
.createRepository();
resourceIdFactory = (new
ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
@@ -145,10 +144,10 @@
this);
txnSubsystem = new
TransactionSubsystem(ncApplicationContext.getNodeId(),
asterixAppRuntimeContextProvider,
txnProperties);
-
+
indexLifecycleManager = new DatasetLifecycleManager(storageProperties,
localResourceRepository,
-
MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID,(LogManager)txnSubsystem.getLogManager());
-
+ MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID,
(LogManager) txnSubsystem.getLogManager());
+
isShuttingdown = false;
feedManager = new FeedManager(ncApplicationContext.getNodeId(),
feedProperties,
@@ -239,7 +238,7 @@
public AsterixExternalProperties getExternalProperties() {
return externalProperties;
}
-
+
@Override
public AsterixFeedProperties getFeedProperties() {
return feedProperties;
diff --git
a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 99d883c..9072983 100644
---
a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++
b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -80,7 +80,7 @@
MetadataManager.INSTANCE = new MetadataManager(proxy,
metadataProperties);
AsterixAppContextInfo.getInstance().getCCApplicationContext()
- .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
+ .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
AsterixExternalProperties externalProperties =
AsterixAppContextInfo.getInstance().getExternalProperties();
setupWebServer(externalProperties);
@@ -88,16 +88,17 @@
setupJSONAPIServer(externalProperties);
jsonAPIServer.start();
- ExternalLibraryBootstrap.setUpExternaLibraries(false);
setupFeedServer(externalProperties);
feedServer.start();
- centralFeedManager = CentralFeedManager.getInstance();
- centralFeedManager.start();
waitUntilServerStart(webServer);
waitUntilServerStart(jsonAPIServer);
waitUntilServerStart(feedServer);
+
+ ExternalLibraryBootstrap.setUpExternaLibraries(false);
+ centralFeedManager = CentralFeedManager.getInstance();
+ centralFeedManager.start();
AsterixGlobalRecoveryManager.INSTANCE = new
AsterixGlobalRecoveryManager(
(HyracksConnection) getNewHyracksClientConnection());
@@ -178,7 +179,7 @@
feedServer.setHandler(context);
context.addServlet(new ServletHolder(new FeedServlet()), "/");
-
+
// add paths here
}
}
\ No newline at end of file
diff --git
a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 3f02a1a..b6da731 100644
---
a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -73,8 +73,22 @@
}
@Override
- public synchronized IIndex getIndex(long resourceID) throws
HyracksDataException {
- DatasetInfo dsInfo = datasetInfos.get(getDIDfromRID(resourceID));
+ public synchronized IIndex getIndex(String resourceName) throws
HyracksDataException {
+ DatasetInfo dsInfo =
datasetInfos.get(getDIDfromResourceName(resourceName));
+ if (dsInfo == null) {
+ return null;
+ }
+ long resourceID = getResourceIDfromResourceName(resourceName);
+ IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+ if (iInfo == null) {
+ return null;
+ }
+ return iInfo.index;
+ }
+
+ @Override
+ public synchronized IIndex getIndex(int datasetID, long resourceID) throws
HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
if (dsInfo == null) {
return null;
}
@@ -86,8 +100,9 @@
}
@Override
- public synchronized void register(long resourceID, IIndex index) throws
HyracksDataException {
- int did = getDIDfromRID(resourceID);
+ public synchronized void register(String resourceName, IIndex index)
throws HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
dsInfo = new DatasetInfo(did, !index.hasMemoryComponents());
@@ -103,17 +118,27 @@
dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index));
}
- private int getDIDfromRID(long resourceID) throws HyracksDataException {
- LocalResource lr = resourceRepository.getResourceById(resourceID);
+ private int getDIDfromResourceName(String resourceName) throws
HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByName(resourceName);
if (lr == null) {
return -1;
}
return ((ILocalResourceMetadata)
lr.getResourceObject()).getDatasetID();
}
+ private long getResourceIDfromResourceName(String resourceName) throws
HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByName(resourceName);
+ if (lr == null) {
+ return -1;
+ }
+ return lr.getResourceId();
+ }
+
@Override
- public synchronized void unregister(long resourceID) throws
HyracksDataException {
- int did = getDIDfromRID(resourceID);
+ public synchronized void unregister(String resourceName) throws
HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
+
DatasetInfo dsInfo = datasetInfos.get(did);
IndexInfo iInfo = dsInfo.indexes.get(resourceID);
@@ -153,20 +178,15 @@
dsInfo.indexes.remove(resourceID);
if (dsInfo.referenceCount == 0 && dsInfo.isOpen &&
dsInfo.indexes.isEmpty() && !dsInfo.isExternal) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
- assert vbcs != null;
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
- }
- datasetInfos.remove(did);
- datasetVirtualBufferCaches.remove(did);
- datasetOpTrackers.remove(did);
+ removeDatasetFromCache(dsInfo.datasetID);
}
}
@Override
- public synchronized void open(long resourceID) throws HyracksDataException
{
- int did = getDIDfromRID(resourceID);
+ public synchronized void open(String resourceName) throws
HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
+
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
throw new HyracksDataException("Failed to open index with resource
ID " + resourceID
@@ -247,15 +267,16 @@
}
public DatasetInfo getDatasetInfo(int datasetID) {
-
synchronized (datasetInfos) {
return datasetInfos.get(datasetID);
}
}
@Override
- public synchronized void close(long resourceID) throws
HyracksDataException {
- int did = getDIDfromRID(resourceID);
+ public synchronized void close(String resourceName) throws
HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
+
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
throw new HyracksDataException("No index found with resourceID " +
resourceID);
@@ -298,6 +319,17 @@
}
return vbcs;
}
+ }
+
+ private void removeDatasetFromCache(int datasetID) {
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(datasetID);
+ assert vbcs != null;
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
+ datasetInfos.remove(datasetID);
+ datasetVirtualBufferCaches.remove(datasetID);
+ datasetOpTrackers.remove(datasetID);
}
public ILSMOperationTracker getOperationTracker(int datasetID) {
@@ -498,7 +530,6 @@
}
if (asyncFlush) {
-
for (IndexInfo iInfo : dsInfo.indexes.values()) {
ILSMIndexAccessor accessor =
iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -546,10 +577,7 @@
}
dsInfo.isOpen = false;
- List<IVirtualBufferCache> vbcs =
getVirtualBufferCaches(dsInfo.datasetID);
- for (IVirtualBufferCache vbc : vbcs) {
- used -= vbc.getNumPages() * vbc.getPageSize();
- }
+ removeDatasetFromCache(dsInfo.datasetID);
}
@Override
@@ -600,4 +628,34 @@
outputStream.write(sb.toString().getBytes());
}
+
+ @Override
+ public boolean supportsPersistentLocalResources() {
+ return true;
+ }
+
+ @Override
+ public void register(long resourceID, IIndex index) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void open(long resourceID) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void close(long resourceID) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void unregister(long resourceID) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized IIndex getIndex(long resourceID) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 93d9fbc..9e9d2f9 100644
---
a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++
b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -53,7 +53,7 @@
indexHelper.open();
AbstractLSMIndex lsmIndex = (AbstractLSMIndex)
indexHelper.getIndexInstance();
try {
- modCallback =
opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
+ modCallback =
opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(indexHelper.getFileReference().getFile().getPath(),
indexHelper.getResourceID(), lsmIndex, ctx);
indexAccessor = lsmIndex.createAccessor(modCallback,
NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory =
opDesc.getTupleFilterFactory();
diff --git
a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b61f410..b16220c 100644
---
a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++
b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -287,9 +287,9 @@
private void insertTupleIntoIndex(JobId jobId, IMetadataIndex
metadataIndex, ITupleReference tuple)
throws Exception {
long resourceID = metadataIndex.getResourceID();
- ILSMIndex lsmIndex = (ILSMIndex)
indexLifecycleManager.getIndex(resourceID);
+ ILSMIndex lsmIndex = (ILSMIndex)
indexLifecycleManager.getIndex(metadataIndex.getFile().toString());
try {
- indexLifecycleManager.open(resourceID);
+ indexLifecycleManager.open(metadataIndex.getFile().toString());
// prepare a Callback for logging
IModificationOperationCallback modCallback =
createIndexModificationCallback(jobId, resourceID,
@@ -310,7 +310,7 @@
} catch (Exception e) {
throw e;
} finally {
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(metadataIndex.getFile().toString());
}
}
@@ -636,9 +636,9 @@
private void deleteTupleFromIndex(JobId jobId, IMetadataIndex
metadataIndex, ITupleReference tuple)
throws Exception {
long resourceID = metadataIndex.getResourceID();
- ILSMIndex lsmIndex = (ILSMIndex)
indexLifecycleManager.getIndex(resourceID);
+ ILSMIndex lsmIndex = (ILSMIndex)
indexLifecycleManager.getIndex(metadataIndex.getFile().toString());
try {
- indexLifecycleManager.open(resourceID);
+ indexLifecycleManager.open(metadataIndex.getFile().toString());
// prepare a Callback for logging
IModificationOperationCallback modCallback =
createIndexModificationCallback(jobId, resourceID,
metadataIndex, lsmIndex, IndexOperation.DELETE);
@@ -656,7 +656,7 @@
} catch (Exception e) {
throw e;
} finally {
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(metadataIndex.getFile().toString());
}
}
@@ -970,8 +970,8 @@
try {
IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ IIndex indexInstance =
indexLifecycleManager.getIndex(index.getFile().toString());
+ indexLifecycleManager.open(index.getFile().toString());
IIndexAccessor indexAccessor =
indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor)
indexAccessor.createSearchCursor(false);
@@ -989,12 +989,12 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(index.getFile().toString());
index = MetadataPrimaryIndexes.DATASET_DATASET;
resourceID = index.getResourceID();
- indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ indexInstance =
indexLifecycleManager.getIndex(index.getFile().toString());
+ indexLifecycleManager.open(index.getFile().toString());
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor)
indexAccessor.createSearchCursor(false);
@@ -1012,12 +1012,12 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(index.getFile().toString());
index = MetadataPrimaryIndexes.INDEX_DATASET;
resourceID = index.getResourceID();
- indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ indexInstance =
indexLifecycleManager.getIndex(index.getFile().toString());
+ indexLifecycleManager.open(index.getFile().toString());
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor)
indexAccessor.createSearchCursor(false);
@@ -1036,7 +1036,7 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(index.getFile().toString());
} catch (Exception e) {
e.printStackTrace();
}
@@ -1046,9 +1046,8 @@
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index,
ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType>
results) throws Exception {
IBinaryComparatorFactory[] comparatorFactories =
index.getKeyBinaryComparatorFactory();
- long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ IIndex indexInstance =
indexLifecycleManager.getIndex(index.getFile().toString());
+ indexLifecycleManager.open(index.getFile().toString());
IIndexAccessor indexAccessor =
indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor)
indexAccessor.createSearchCursor(false);
@@ -1077,7 +1076,7 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(index.getFile().toString());
}
@Override
@@ -1085,8 +1084,9 @@
int mostRecentDatasetId =
MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
long resourceID =
MetadataPrimaryIndexes.DATASET_DATASET.getResourceID();
try {
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ IIndex indexInstance =
indexLifecycleManager.getIndex(MetadataPrimaryIndexes.DATASET_DATASET.getFile()
+ .toString());
+
indexLifecycleManager.open(MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString());
try {
IIndexAccessor indexAccessor =
indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -1113,7 +1113,7 @@
rangeCursor.close();
}
} finally {
- indexLifecycleManager.close(resourceID);
+
indexLifecycleManager.close(MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString());
}
} catch (Exception e) {
diff --git
a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 97c90f8..ec1179d 100644
---
a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -123,9 +123,8 @@
MetadataPrimaryIndexes.INDEX_DATASET,
MetadataPrimaryIndexes.NODE_DATASET,
MetadataPrimaryIndexes.NODEGROUP_DATASET,
MetadataPrimaryIndexes.FUNCTION_DATASET,
MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
MetadataPrimaryIndexes.FEED_DATASET,
- MetadataPrimaryIndexes.FEED_POLICY_DATASET,
- MetadataPrimaryIndexes.LIBRARY_DATASET,
MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
- MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET };
+ MetadataPrimaryIndexes.FEED_POLICY_DATASET,
MetadataPrimaryIndexes.LIBRARY_DATASET,
+ MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET };
secondaryIndexes = new IMetadataIndex[] {
MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX,
@@ -417,11 +416,11 @@
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory =
localResourceFactoryProvider.getLocalResourceFactory();
localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID,
path, 0));
- indexLifecycleManager.register(resourceID, lsmBtree);
+ indexLifecycleManager.register(path, lsmBtree);
} else {
final LocalResource resource =
localResourceRepository.getResourceByName(path);
resourceID = resource.getResourceId();
- lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
+ lsmBtree = (LSMBTree)
indexLifecycleManager.getIndex(resource.getResourceName());
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtils.createLSMTree(
virtualBufferCaches,
@@ -436,7 +435,7 @@
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager),
opTracker,
runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE
.createIOOperationCallback(),
index.isPrimaryIndex(), null, null, null, null, true);
- indexLifecycleManager.register(resourceID, lsmBtree);
+ indexLifecycleManager.register(path, lsmBtree);
}
}
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index fbec3b8..c3a7555 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -1,18 +1,14 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you
may
+ not use this file except in compliance with the License. ! you may
obtain
+ a copy of the License from ! !
http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software
!
+ distributed under the License is distributed on an "AS IS" BASIS, !
WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See
the
+ License for the specific language governing permissions and !
limitations
+ under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>asterix</artifactId>
@@ -21,14 +17,14 @@
</parent>
<artifactId>asterix-transactions</artifactId>
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
<build>
<plugins>
@@ -42,52 +38,50 @@
<fork>true</fork>
</configuration>
</plugin>
- <plugin>
- <groupId>edu.uci.ics.asterix</groupId>
- <artifactId>record-manager-generator-maven-plugin</artifactId>
- <version>0.8.7-SNAPSHOT</version>
- <configuration>
- <debug>false</debug>
- <inputFiles>
-
<param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json</param>
-
<param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json</param>
-
<param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json</param>
- </inputFiles>
-
<packageName>edu.uci.ics.asterix.transaction.management.service.locking</packageName>
- </configuration>
- <executions>
- <execution>
- <id>generate-record-manager</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>generate-record-manager</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.9</version>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
-
<source>${project.build.directory}/generated-sources/java/</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
+ <plugin>
+ <groupId>edu.uci.ics.asterix</groupId>
+
<artifactId>record-manager-generator-maven-plugin</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <configuration>
+ <debug>false</debug>
+ <inputFiles>
+
<param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Job.json</param>
+
<param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Resource.json</param>
+
<param>src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Request.json</param>
+ </inputFiles>
+
<packageName>edu.uci.ics.asterix.transaction.management.service.locking</packageName>
+ </configuration>
+ <executions>
+ <execution>
+ <id>generate-record-manager</id>
+ <phase>generate-sources</phase>
+ <goals>
+
<goal>generate-record-manager</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+
<artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+
<source>${project.build.directory}/generated-sources/java/</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
-
</build>
-
<dependencies>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
@@ -105,13 +99,17 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.asterix</groupId>
- <artifactId>asterix-common</artifactId>
- <version>0.8.7-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
</dependencies>
-
</project>
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 652e55a..a94c068 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -46,13 +46,13 @@
}
@Override
- public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ public IModificationOperationCallback
createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws
HyracksDataException {
ITransactionSubsystem txnSubsystem =
txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is
not registered.");
}
@@ -68,4 +68,10 @@
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
+ IHyracksTaskContext ctx) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index a04eb62..c92c760 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -43,13 +43,12 @@
}
@Override
- public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
-
+ public IModificationOperationCallback
createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws
HyracksDataException {
ITransactionSubsystem txnSubsystem =
txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is
not registered.");
}
@@ -65,4 +64,10 @@
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
+ IHyracksTaskContext ctx) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 4081101..17a0cde 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -30,25 +30,26 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class TempDatasetPrimaryIndexModificationOperationCallbackFactory
extends AbstractOperationCallbackFactory implements
- IModificationOperationCallbackFactory {
+public class TempDatasetPrimaryIndexModificationOperationCallbackFactory
extends AbstractOperationCallbackFactory
+ implements IModificationOperationCallbackFactory {
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId
jobId, int datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType) {
+ public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId
jobId, int datasetId,
+ int[] primaryKeyFields, ITransactionSubsystemProvider
txnSubsystemProvider, IndexOperation indexOp,
+ byte resourceType) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
resourceType);
this.indexOp = indexOp;
}
@Override
- public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ public IModificationOperationCallback
createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws
HyracksDataException {
ITransactionSubsystem txnSubsystem =
txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is
not registered.");
}
@@ -64,4 +65,10 @@
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
+ IHyracksTaskContext ctx) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index ce10bd7..27934a2 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -30,25 +30,26 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class TempDatasetSecondaryIndexModificationOperationCallbackFactory
extends AbstractOperationCallbackFactory implements
- IModificationOperationCallbackFactory {
+public class TempDatasetSecondaryIndexModificationOperationCallbackFactory
extends AbstractOperationCallbackFactory
+ implements IModificationOperationCallbackFactory {
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId
jobId, int datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType) {
+ public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId
jobId, int datasetId,
+ int[] primaryKeyFields, ITransactionSubsystemProvider
txnSubsystemProvider, IndexOperation indexOp,
+ byte resourceType) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
resourceType);
this.indexOp = indexOp;
}
@Override
- public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ public IModificationOperationCallback
createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws
HyracksDataException {
ITransactionSubsystem txnSubsystem =
txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex)
indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is
not registered.");
}
@@ -64,4 +65,10 @@
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public IModificationOperationCallback
createModificationOperationCallback(long resourceId, Object resource,
+ IHyracksTaskContext ctx) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 903ddc8..6d92534 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -25,8 +25,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
@@ -42,14 +46,14 @@
private static final String ROOT_METADATA_FILE_NAME_PREFIX =
".asterix_root_metadata";
private static final long ROOT_LOCAL_RESOURCE_ID = -4321;
private static final String METADATA_FILE_NAME = ".metadata";
- private Map<String, LocalResource> name2ResourceMap = new HashMap<String,
LocalResource>();
- private Map<Long, LocalResource> id2ResourceMap = new HashMap<Long,
LocalResource>();
- private final int numIODevices;
+ private final Cache<String, LocalResource> resourcesCache;
+ private final String nodeId;
+ private static final int MAX_CACHED_RESOURCES = 100;
- public PersistentLocalResourceRepository(List<IODeviceHandle> devices)
throws HyracksDataException {
- numIODevices = devices.size();
- this.mountPoints = new String[numIODevices];
- for (int i = 0; i < numIODevices; i++) {
+ public PersistentLocalResourceRepository(List<IODeviceHandle> devices,
String nodeId) throws HyracksDataException {
+ mountPoints = new String[devices.size()];
+ this.nodeId = nodeId;
+ for (int i = 0; i < mountPoints.length; i++) {
String mountPoint = devices.get(i).getPath().getPath();
File mountPointDir = new File(mountPoint);
if (!mountPointDir.exists()) {
@@ -61,6 +65,8 @@
mountPoints[i] = new String(mountPoint);
}
}
+
+ resourcesCache =
CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
}
private String prepareRootMetaDataFileName(String mountPoint, String
nodeId, int ioDeviceId) {
@@ -75,7 +81,7 @@
if (isNewUniverse) {
//#. if the rootMetadataFile doesn't exist, create it and return.
- for (int i = 0; i < numIODevices; i++) {
+ for (int i = 0; i < mountPoints.length; i++) {
String rootMetadataFileName =
prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
+ ROOT_METADATA_FILE_NAME_PREFIX;
File rootMetadataFile = new File(rootMetadataFileName);
@@ -115,78 +121,13 @@
return;
}
- FilenameFilter filter = new FilenameFilter() {
- public boolean accept(File dir, String name) {
- if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
- return true;
- } else {
- return false;
- }
- }
- };
-
long maxResourceId = 0;
- for (int i = 0; i < numIODevices; i++) {
- String rootMetadataFileName =
prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
- + ROOT_METADATA_FILE_NAME_PREFIX;
- File rootMetadataFile = new File(rootMetadataFileName);
- //#. if the rootMetadataFile exists, read it and set this.rootDir.
- LocalResource rootLocalResource =
readLocalResource(rootMetadataFile);
- String mountedRootDir = (String)
rootLocalResource.getResourceObject();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The root directory of the local resource
repository is " + mountedRootDir);
- }
-
- //#. load all local resources.
- File rootDirFile = new File(mountedRootDir);
- if (!rootDirFile.exists()) {
- //rootDir may not exist if this node is not the metadata node
and doesn't have any user data.
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The root directory of the local resource
repository doesn't exist: there is no local resource.");
- LOGGER.info("Completed the initialization of the local
resource repository");
- }
- continue;
- }
-
- File[] dataverseFileList = rootDirFile.listFiles();
- if (dataverseFileList == null) {
- throw new HyracksDataException("Metadata dataverse doesn't
exist.");
- }
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- File[] metadataFiles =
ioDeviceFile.listFiles(filter);
- if (metadataFiles != null) {
- for (File metadataFile :
metadataFiles) {
- LocalResource
localResource = readLocalResource(metadataFile);
-
id2ResourceMap.put(localResource.getResourceId(), localResource);
- name2ResourceMap
-
.put(localResource.getResourceName(), localResource);
- maxResourceId =
Math.max(localResource.getResourceId(),
- maxResourceId);
- if
(LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("loaded
local resource - [id: "
- +
localResource.getResourceId() + ", name: "
- +
localResource.getResourceName() + "]");
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ //TODO this could be optimized by storing the current max resource id
in the checkpoint file
+ Map<Long, LocalResource> resourceMap = loadAndGetAllResources();
+ for (Long resourceId : resourceMap.keySet()) {
+ maxResourceId = Math.max(maxResourceId, resourceId);
}
+
resourceIdFactory.initId(maxResourceId + 1);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("The resource id factory is intialized with the value:
" + (maxResourceId + 1));
@@ -195,33 +136,36 @@
}
@Override
- public LocalResource getResourceById(long id) throws HyracksDataException {
- return id2ResourceMap.get(id);
- }
-
- @Override
public LocalResource getResourceByName(String name) throws
HyracksDataException {
- return name2ResourceMap.get(name);
+ LocalResource resource = resourcesCache.getIfPresent(name);
+ if (resource == null) {
+ File resourceFile = getLocalResourceFileByName(name);
+ if (resourceFile.exists()) {
+ resource = readLocalResource(resourceFile);
+ resourcesCache.put(name, resource);
+ return readLocalResource(resourceFile);
+ }
+ }
+ return resource;
}
@Override
public synchronized void insert(LocalResource resource) throws
HyracksDataException {
- long id = resource.getResourceId();
+ File resourceFile = new File(getFileName(resource.getResourceName(),
resource.getResourceId()));
- if (id2ResourceMap.containsKey(id)) {
+ if (resourceFile.exists()) {
throw new HyracksDataException("Duplicate resource");
}
if (resource.getResourceId() != ROOT_LOCAL_RESOURCE_ID) {
- id2ResourceMap.put(id, resource);
- name2ResourceMap.put(resource.getResourceName(), resource);
+ resourcesCache.put(resource.getResourceName(), resource);
}
FileOutputStream fos = null;
ObjectOutputStream oosToFos = null;
try {
- fos = new FileOutputStream(getFileName(resource.getResourceName(),
resource.getResourceId()));
+ fos = new FileOutputStream(resourceFile);
oosToFos = new ObjectOutputStream(fos);
oosToFos.writeObject(resource);
oosToFos.flush();
@@ -246,40 +190,89 @@
}
@Override
- public synchronized void deleteResourceById(long id) throws
HyracksDataException {
- LocalResource resource = id2ResourceMap.get(id);
- if (resource == null) {
+ public synchronized void deleteResourceByName(String name) throws
HyracksDataException {
+ File resourceFile = getLocalResourceFileByName(name);
+ if (resourceFile.exists()) {
+ resourceFile.delete();
+ resourcesCache.invalidate(name);
+ } else {
throw new HyracksDataException("Resource doesn't exist");
}
- id2ResourceMap.remove(id);
- name2ResourceMap.remove(resource.getResourceName());
- File file = new File(getFileName(resource.getResourceName(),
resource.getResourceId()));
- file.delete();
}
- @Override
- public synchronized void deleteResourceByName(String name) throws
HyracksDataException {
- LocalResource resource = name2ResourceMap.get(name);
- if (resource == null) {
- throw new HyracksDataException("Resource doesn't exist");
+ private static File getLocalResourceFileByName(String resourceName) {
+ return new File(resourceName + File.separator + METADATA_FILE_NAME);
+ }
+
+ public HashMap<Long, LocalResource> loadAndGetAllResources() throws
HyracksDataException {
+ HashMap<Long, LocalResource> resourcesMap = new HashMap<Long,
LocalResource>();
+
+ for (int i = 0; i < mountPoints.length; i++) {
+ String rootMetadataFileName =
prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
+ + ROOT_METADATA_FILE_NAME_PREFIX;
+ File rootMetadataFile = new File(rootMetadataFileName);
+ if (!rootMetadataFile.exists()) {
+ continue;
+ }
+ //#. if the rootMetadataFile exists, read it and set it as
mounting point root
+ LocalResource rootLocalResource =
readLocalResource(rootMetadataFile);
+ String mountedRootDir = (String)
rootLocalResource.getResourceObject();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("The root directory of the local resource
repository is " + mountedRootDir);
+ }
+
+ File rootDirFile = new File(mountedRootDir);
+ if (!rootDirFile.exists()) {
+ //rootDir may not exist if this node is not the metadata node
and doesn't have any user data.
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("The root directory of the local resource
repository doesn't exist: there is no local resource.");
+ LOGGER.info("Completed the initialization of the local
resource repository");
+ }
+ continue;
+ }
+
+ //#. load all local resources.
+ File[] dataverseFileList = rootDirFile.listFiles();
+ if (dataverseFileList == null) {
+ throw new HyracksDataException("Metadata dataverse doesn't
exist.");
+ }
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] ioDevicesList = indexFile.listFiles();
+ if (ioDevicesList != null) {
+ for (File ioDeviceFile : ioDevicesList) {
+ if (ioDeviceFile.isDirectory()) {
+ File[] metadataFiles =
ioDeviceFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile :
metadataFiles) {
+ LocalResource
localResource = readLocalResource(metadataFile);
+
resourcesMap.put(localResource.getResourceId(), localResource);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
- id2ResourceMap.remove(resource.getResourceId());
- name2ResourceMap.remove(name);
- File file = new File(getFileName(resource.getResourceName(),
resource.getResourceId()));
- file.delete();
+
+ return resourcesMap;
}
@Override
public List<LocalResource> getAllResources() throws HyracksDataException {
- List<LocalResource> resources = new ArrayList<LocalResource>();
- for (LocalResource resource : id2ResourceMap.values()) {
- resources.add(resource);
- }
- return resources;
+ HashMap<Long, LocalResource> resourcesMap = loadAndGetAllResources();
+ return new ArrayList<LocalResource>(resourcesMap.values());
}
private String getFileName(String baseDir, long resourceId) {
-
if (resourceId == ROOT_LOCAL_RESOURCE_ID) {
return baseDir;
} else {
@@ -319,4 +312,24 @@
}
}
}
+
+ @Override
+ public synchronized void deleteResourceById(long id) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LocalResource getResourceById(long id) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ private static final FilenameFilter METADATA_FILES_FILTER = new
FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
}
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 1e6767b..74b92e0 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -20,14 +20,16 @@
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
public class PersistentLocalResourceRepositoryFactory implements
ILocalResourceRepositoryFactory {
- private IIOManager ioManager;
+ private final IIOManager ioManager;
+ private final String nodeId;
- public PersistentLocalResourceRepositoryFactory(IIOManager ioManager) {
+ public PersistentLocalResourceRepositoryFactory(IIOManager ioManager,
String nodeId) {
this.ioManager = ioManager;
+ this.nodeId = nodeId;
}
@Override
public ILocalResourceRepository createRepository() throws
HyracksDataException {
- return new PersistentLocalResourceRepository(ioManager.getIODevices());
+ return new PersistentLocalResourceRepository(ioManager.getIODevices(),
nodeId);
}
}
\ No newline at end of file
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 8b5e026..7cfc50a 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -35,9 +35,9 @@
class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
- AsterixThreadExecutor ate = new
AsterixThreadExecutor(Executors.defaultThreadFactory());
+ AsterixThreadExecutor ate = new
AsterixThreadExecutor(Executors.defaultThreadFactory());
IIndexLifecycleManager ilm = new IndexLifecycleManager();
-
+
@Override
public AsterixThreadExecutor getThreadExecutor() {
return ate;
@@ -134,6 +134,40 @@
public List<IIndex> getOpenIndexes() {
throw new UnsupportedOperationException();
}
-
+
+ @Override
+ public void register(String resourceName, IIndex index) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void open(String resourceName) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean supportsPersistentLocalResources() {
+ return false;
+ }
+
+ @Override
+ public void close(String resourceName) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IIndex getIndex(String resourceName) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unregister(String resourceName) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IIndex getIndex(int datasetID, long resourceID) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
}
\ No newline at end of file
diff --git
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index d415935..8996566 100644
---
a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++
b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -47,6 +47,7 @@
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.LogType;
+import
edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import
edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import
edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
@@ -78,7 +79,7 @@
private final LogManager logMgr;
private final int checkpointHistory;
private final long SHARP_CHECKPOINT_LSN = -1;
-
+ private Map<Long, LocalResource> resourcesMap;
/**
* A file at a known location that contains the LSN of the last log record
* traversed doing a successful checkpoint.
@@ -245,6 +246,7 @@
IIndexLifecycleManager indexLifecycleManager =
appRuntimeContext.getIndexLifecycleManager();
ILocalResourceRepository localResourceRepository =
appRuntimeContext.getLocalResourceRepository();
+ resourcesMap = ((PersistentLocalResourceRepository)
localResourceRepository).loadAndGetAllResources();
//#. set log reader to the lowWaterMarkLsn again.
logReader.initializeScan(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -269,12 +271,12 @@
}
if (foundWinner) {
resourceId = logRecord.getResourceId();
- localResource =
localResourceRepository.getResourceById(resourceId);
+ localResource = resourcesMap.get(resourceId);
//get index instance from IndexLifeCycleManager
//if index is not registered into
IndexLifeCycleManager,
//create the index using LocalMetadata stored in
LocalResourceRepository
- index = (ILSMIndex)
indexLifecycleManager.getIndex(resourceId);
+ index = (ILSMIndex)
indexLifecycleManager.getIndex(localResource.getResourceName());
if (index == null) {
/*******************************************************************
@@ -300,8 +302,8 @@
localResourceMetadata = (ILocalResourceMetadata)
localResource.getResourceObject();
index =
localResourceMetadata.createIndexInstance(appRuntimeContext,
localResource.getResourceName(),
localResource.getPartition());
- indexLifecycleManager.register(resourceId, index);
- indexLifecycleManager.open(resourceId);
+
indexLifecycleManager.register(localResource.getResourceName(), index);
+
indexLifecycleManager.open(localResource.getResourceName());
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = (ILSMIndex) index;
@@ -338,7 +340,7 @@
//close all indexes
Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
- indexLifecycleManager.close(r);
+ indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
}
logReader.close();
@@ -352,11 +354,12 @@
}
@Override
- public synchronized long checkpoint(boolean isSharpCheckpoint, long
nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException {
+ public synchronized long checkpoint(boolean isSharpCheckpoint, long
nonSharpCheckpointTargetLSN)
+ throws ACIDException, HyracksDataException {
long minMCTFirstLSN;
boolean nonSharpCheckpointSucceeded = false;
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
}
@@ -368,7 +371,8 @@
// right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- DatasetLifecycleManager datasetLifecycleManager =
(DatasetLifecycleManager)txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+ DatasetLifecycleManager datasetLifecycleManager =
(DatasetLifecycleManager) txnSubsystem
+
.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
//#. flush all in-memory components if it is the sharp checkpoint
if (isSharpCheckpoint) {
@@ -378,11 +382,10 @@
} else {
minMCTFirstLSN = getMinFirstLSN();
-
- if(minMCTFirstLSN >= nonSharpCheckpointTargetLSN){
+
+ if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
nonSharpCheckpointSucceeded = true;
- }
- else{
+ } else {
//flush datasets with indexes behind target checkpoint LSN
datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
}
@@ -435,10 +438,10 @@
}
}
- if(nonSharpCheckpointSucceeded){
+ if (nonSharpCheckpointSucceeded) {
logMgr.deleteOldLogFiles(minMCTFirstLSN);
}
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Completed sharp checkpoint.");
}
@@ -447,9 +450,9 @@
return minMCTFirstLSN;
}
- public long getMinFirstLSN() throws HyracksDataException
- {
- IIndexLifecycleManager indexLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+ public long getMinFirstLSN() throws HyracksDataException {
+ IIndexLifecycleManager indexLifecycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getIndexLifecycleManager();
List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
long firstLSN;
//the min first lsn can only be the current append or smaller
@@ -459,8 +462,9 @@
for (IIndex index : openIndexList) {
- AbstractLSMIOOperationCallback ioCallback =
(AbstractLSMIOOperationCallback)((ILSMIndex) index).getIOOperationCallback();
- if(!((AbstractLSMIndex)index).isCurrentMutableComponentEmpty()
|| ioCallback.hasPendingFlush()){
+ AbstractLSMIOOperationCallback ioCallback =
(AbstractLSMIOOperationCallback) ((ILSMIndex) index)
+ .getIOOperationCallback();
+ if (!((AbstractLSMIndex)
index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
firstLSN = ioCallback.getFirstLSN();
minFirstLSN = Math.min(minFirstLSN, firstLSN);
}
@@ -590,6 +594,7 @@
ILogReader logReader = logMgr.getLogReader(false);
logReader.initializeScan(firstLSN);
ILogRecord logRecord = null;
+
while (currentLSN < lastLSN) {
logRecord = logReader.next();
if (logRecord == null) {
@@ -706,7 +711,7 @@
private void undo(ILogRecord logRecord) {
try {
ILSMIndex index = (ILSMIndex)
txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(logRecord.getResourceId());
+ .getIndex(logRecord.getDatasetId(),
logRecord.getResourceId());
ILSMIndexAccessor indexAccessor =
index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
@@ -724,7 +729,7 @@
private void redo(ILogRecord logRecord) {
try {
ILSMIndex index = (ILSMIndex)
txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(logRecord.getResourceId());
+ .getIndex(logRecord.getDatasetId(),
logRecord.getResourceId());
ILSMIndexAccessor indexAccessor =
index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/344
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I48b9260a3280750145f6ddb3783673a299055910
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>