Steven Jacobs has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2640
Change subject: [ASTERIXDB-2386][CLUS] Allow extension of the global recovery manager ...................................................................... [ASTERIXDB-2386][CLUS] Allow extension of the global recovery manager Allow the Asterix GlobalRecoveryManager to be extended Add an hcc method to reset the deployed job id factory Change-Id: I1213e702a77ededde18ee0b50bc105212f43480d --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IGlobalRecoveryExtension.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java 13 files changed, 146 insertions(+), 15 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/40/2640/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java index 1ba418a..9ad9dd7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java @@ -28,12 +28,14 @@ import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; import org.apache.asterix.common.api.ExtensionId; import org.apache.asterix.common.api.IExtension; +import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.compiler.provider.SqlppCompilationProvider; +import org.apache.asterix.hyracks.bootstrap.GlobalRecoveryManager; import org.apache.asterix.om.functions.IFunctionExtensionManager; import org.apache.asterix.om.functions.IFunctionManager; import org.apache.asterix.runtime.functions.FunctionCollection; @@ -53,6 +55,7 @@ private final ILangCompilationProvider aqlCompilationProvider; private final ILangCompilationProvider sqlppCompilationProvider; private final IFunctionManager functionManager; + private final IGlobalRecoveryManager globalRecoveryManager; private transient IStatementExecutorFactory statementExecutorFactory; /** @@ -71,6 +74,7 @@ Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null; Pair<ExtensionId, IFunctionManager> fm = null; IStatementExecutorExtension see = null; + IGlobalRecoveryManager grm = null; if (list != null) { Set<ExtensionId> extensionIds = new HashSet<>(); for (AsterixExtension extensionConf : list) { @@ -89,6 +93,9 @@ sqlppcp = ExtensionUtil.extendLangCompilationProvider(Language.SQLPP, sqlppcp, le); fm = ExtensionUtil.extendFunctionManager(fm, le); break; + case RECOVERY: + grm = ((IGlobalRecoveryExtension) extension).getGlobalRecoveryManager(); + break; default: break; } @@ -99,6 +106,7 @@ this.sqlppCompilationProvider = sqlppcp == null ? new SqlppCompilationProvider() : sqlppcp.second; this.functionManager = fm == null ? new FunctionManager(FunctionCollection.createDefaultFunctionCollection()) : fm.second; + this.globalRecoveryManager = grm; } /** @deprecated use getStatementExecutorFactory instead */ @@ -127,6 +135,13 @@ } } + public IGlobalRecoveryManager getGlobalRecoveryManager() { + if (globalRecoveryManager == null) { + return new GlobalRecoveryManager(); + } + return globalRecoveryManager; + } + @Override public IFunctionManager getFunctionManager() { return functionManager; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IGlobalRecoveryExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IGlobalRecoveryExtension.java new file mode 100644 index 0000000..3c7c3df --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IGlobalRecoveryExtension.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.cc; + +import org.apache.asterix.common.api.IExtension; +import org.apache.asterix.common.cluster.IGlobalRecoveryManager; + +/** + * An interface for extensions of {@code IGlobalRecoveryManager} + */ +public interface IGlobalRecoveryExtension extends IExtension { + + @Override + default ExtensionKind getExtensionKind() { + return ExtensionKind.RECOVERY; + } + + public IGlobalRecoveryManager getGlobalRecoveryManager(); + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index a3ca8b2..522f342 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -55,6 +55,7 @@ import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.config.AsterixExtension; +import org.apache.asterix.common.config.ExtensionProperties; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.MetadataProperties; @@ -62,6 +63,7 @@ import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.utils.Servlets; @@ -139,12 +141,13 @@ INcLifecycleCoordinator lifecycleCoordinator = createNcLifeCycleCoordinator(repProp.isReplicationEnabled()); ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); componentProvider = new StorageComponentProvider(); - GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); + + List<AsterixExtension> extensions = new ArrayList<>(); + extensions.addAll(getExtensions()); + ccExtensionManager = new CCExtensionManager(extensions); + GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(ccExtensionManager); statementExecutorCtx = new StatementExecutorContext(); appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator); - List<AsterixExtension> extensions = new ArrayList<>(); - extensions.addAll(this.getExtensions()); - ccExtensionManager = new CCExtensionManager(extensions); appCtx.setExtensionManager(ccExtensionManager); final CCConfig ccConfig = controllerService.getCCConfig(); if (System.getProperty("java.rmi.server.hostname") == null) { @@ -177,8 +180,12 @@ new MetadataLockManager()); } - protected GlobalRecoveryManager createGlobalRecoveryManager() throws Exception { - return new GlobalRecoveryManager(ccServiceCtx, getHcc(), componentProvider); + protected GlobalRecoveryManager createGlobalRecoveryManager(CCExtensionManager ccExtensionManager) + throws Exception { + GlobalRecoveryManager globalRecoveryManager = + (GlobalRecoveryManager) ccExtensionManager.getGlobalRecoveryManager(); + globalRecoveryManager.create(ccServiceCtx, getHcc(), componentProvider); + return globalRecoveryManager; } protected INcLifecycleCoordinator createNcLifeCycleCoordinator(boolean replicationEnabled) { @@ -191,8 +198,8 @@ LoggingConfigUtil.defaultIfMissing(GlobalConfig.ASTERIX_LOGGER_NAME, level); } - protected List<AsterixExtension> getExtensions() { - return appCtx.getExtensionProperties().getExtensions(); + protected List<AsterixExtension> getExtensions() throws IOException, AsterixException { + return new ExtensionProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())).getExtensions(); } protected void configureServers() throws Exception { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index 3d9b822..3c71232 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -54,17 +54,19 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { private static final Logger LOGGER = LogManager.getLogger(); - protected final IStorageComponentProvider componentProvider; - protected final ICCServiceContext serviceCtx; + protected IStorageComponentProvider componentProvider; + protected ICCServiceContext serviceCtx; protected IHyracksClientConnection hcc; protected volatile boolean recoveryCompleted; protected volatile boolean recovering; - public GlobalRecoveryManager(ICCServiceContext serviceCtx, IHyracksClientConnection hcc, + @Override + public void create(ICCServiceContext serviceCtx, IHyracksClientConnection hcc, IStorageComponentProvider componentProvider) { this.serviceCtx = serviceCtx; this.hcc = hcc; this.componentProvider = componentProvider; + } @Override @@ -126,7 +128,8 @@ throws Exception { // Loop over datasets for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) { - mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse); + mdTxnCtx = recoverDatasets(appCtx, mdTxnCtx, dataverse); + MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse.getDataverseName()); } return mdTxnCtx; } @@ -138,8 +141,8 @@ } } - private MetadataTransactionContext recoverDataset(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx, - Dataverse dataverse) throws Exception { + protected MetadataTransactionContext recoverDatasets(ICcApplicationContext appCtx, + MetadataTransactionContext mdTxnCtx, Dataverse dataverse) throws Exception { if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) { MetadataProvider metadataProvider = new MetadataProvider(appCtx, dataverse); try { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java index 9551935..7e9879a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IExtension.java @@ -40,7 +40,11 @@ /** * Extends Language Syntax and Algebric Operations */ - LANG + LANG, + /** + * Extends Recovery Capabilities + */ + RECOVERY } /** diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java index a3add90..07109e3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryManager.java @@ -19,11 +19,20 @@ package org.apache.asterix.common.cluster; import org.apache.asterix.common.api.IClusterEventsSubscriber; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; public interface IGlobalRecoveryManager extends IClusterEventsSubscriber { /** + * Sets up the Global Recovery Manager + */ + void create(ICCServiceContext serviceCtx, IHyracksClientConnection hcc, + IStorageComponentProvider componentProvider); + + /** * Starts the global recovery process after the cluster state has changed to ACTIVE. * * @param appCtx the application context diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index 7182f42..cc3125d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -42,6 +42,7 @@ DEPLOY_JOB, UNDEPLOY_JOB, UPSERT_DEPLOYED_JOB, + RESET_DEPLOYED_JOB_ID_FACTORY, CANCEL_JOB, GET_DATASET_DIRECTORY_SERIVICE_INFO, GET_DATASET_RESULT_STATUS, @@ -134,6 +135,25 @@ } } + public static class ResetDeployedJobIdFactoryFunction extends Function { + private static final long serialVersionUID = 1L; + + private final long nextDeployedJobSpecId; + + public ResetDeployedJobIdFactoryFunction(long nextDeployedJobSpecId) { + this.nextDeployedJobSpecId = nextDeployedJobSpecId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.RESET_DEPLOYED_JOB_ID_FACTORY; + } + + public long getNextDeployedJobSpecId() { + return nextDeployedJobSpecId; + } + } + public static class DeployJobSpecFunction extends Function { private static final long serialVersionUID = 1L; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index 07ca6b0..4681d25 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -106,6 +106,13 @@ } @Override + public void resetDeployedJobIdFactory(long nextDeployedJobSpecId) throws Exception { + HyracksClientInterfaceFunctions.ResetDeployedJobIdFactoryFunction rdjiff = + new HyracksClientInterfaceFunctions.ResetDeployedJobIdFactoryFunction(nextDeployedJobSpecId); + rpci.call(ipcHandle, rdjiff); + } + + @Override public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { HyracksClientInterfaceFunctions.UndeployJobSpecFunction sjf = new HyracksClientInterfaceFunctions.UndeployJobSpecFunction(deployedJobSpecId); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 5b98778..0132024 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -119,6 +119,12 @@ } @Override + public void resetDeployedJobIdFactory(long nextDeployedJobSpecId) throws Exception { + hci.resetDeployedJobIdFactory(nextDeployedJobSpecId); + + } + + @Override public DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception { JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index 61d1418..980b802 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -111,6 +111,15 @@ throws Exception; /** + * Update the next id to use for deployed jobs. + * + * @param nextDeployedJobSpecId + * The id that should be used for the next deployed job + * @throws Exception + */ + void resetDeployedJobIdFactory(long nextDeployedJobSpecId) throws Exception; + + /** * Remove the deployed Job Spec * * @param deployedJobSpecId diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 2b92bcd..37c2510 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -48,6 +48,8 @@ public DeployedJobSpecId upsertDeployedJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] acggfBytes) throws Exception; + public void resetDeployedJobIdFactory(long nextDeployedJobSpecId) throws Exception; + public DeployedJobSpecId undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java index 24caa9b..0d54433 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/DeployedJobSpecIdFactory.java @@ -31,4 +31,8 @@ return id.get(); } + public void reset(long nextDeployedJobSpecId) { + id.set(nextDeployedJobSpecId); + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index f123c8a..ca3461c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -97,6 +97,15 @@ ccs.getWorkQueue().schedule(new DeployJobSpecWork(ccs, udjsf.getACGGFBytes(), udjsf.getDeployedJobSpecId(), true, new IPCResponder<>(handle, mid))); break; + case RESET_DEPLOYED_JOB_ID_FACTORY: + deployedJobSpecIdFactory.reset(((HyracksClientInterfaceFunctions.ResetDeployedJobIdFactoryFunction) fn) + .getNextDeployedJobSpecId()); + try { + handle.send(mid, deployedJobSpecIdFactory.maxDeployedJobSpecId(), null); + } catch (IPCException e) { + LOGGER.log(Level.WARN, "Error sending response to RESET_DEPLOYED_JOB_ID_FACTORY request", e); + } + break; case UNDEPLOY_JOB: HyracksClientInterfaceFunctions.UndeployJobSpecFunction dsjf = (HyracksClientInterfaceFunctions.UndeployJobSpecFunction) fn; -- To view, visit https://asterix-gerrit.ics.uci.edu/2640 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I1213e702a77ededde18ee0b50bc105212f43480d Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>