abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Abort CC jobs on first time registration ......................................................................
[NO ISSUE][RT] Abort CC jobs on first time registration - user model changes: no - storage format changes: no - interface changes: yes - application context is in charge of providing and renewing cc client connection. details: - This change allows revival of cc if it gets killed. - Jobs that were started by this cc are aborted and cleaned up on all ncs upon first time registration. - client connections are repaired on ncs when dead connection is detected. Change-Id: If755b7131bdc91790ed28be66f0c61b51f28c2fa Reviewed-on: https://asterix-gerrit.ics.uci.edu/2026 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java R hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java 28 files changed, 323 insertions(+), 158 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; ; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java index c38b3fc..3912bd5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java @@ -24,6 +24,8 @@ import java.io.PrintWriter; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IApplicationContext; @@ -33,8 +35,10 @@ import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.client.dataset.HyracksDataset; import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.ipc.exceptions.IPCException; public class AbstractQueryApiServlet extends AbstractServlet { + private static final Logger LOGGER = Logger.getLogger(AbstractQueryApiServlet.class.getName()); protected final IApplicationContext appCtx; public enum ResultFields { @@ -99,9 +103,19 @@ } protected IHyracksDataset getHyracksDataset() throws Exception { // NOSONAR - synchronized (ctx) { - IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); - if (hds == null) { + try { + return doGetHyracksDataset(); + } catch (IPCException e) { + LOGGER.log(Level.WARNING, "Failed getting hyracks dataset connection. Resetting hyracks connection.", e); + ctx.put(HYRACKS_CONNECTION_ATTR, appCtx.getHcc()); + return doGetHyracksDataset(); + } + } + + protected IHyracksDataset doGetHyracksDataset() throws Exception { + IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); + if (hds == null) { + synchronized (ctx) { hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR); if (hds == null) { hds = new HyracksDataset(getHyracksClientConnection(), @@ -109,18 +123,16 @@ ctx.put(HYRACKS_DATASET_ATTR, hds); } } - return hds; } + return hds; } protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR - synchronized (ctx) { - final IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); - if (hcc == null) { - throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR); - } - return hcc; + IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); + if (hcc == null) { + throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR); } + return hcc; } protected static UUID printRequestId(PrintWriter pw) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index da06dd1..616c22e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; @@ -45,6 +46,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.ipc.exceptions.IPCException; @@ -64,9 +66,9 @@ } @Override - protected void executeStatement(String statementsText, - SessionOutput sessionOutput, IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, - RequestParameters param, long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception { + protected void executeStatement(String statementsText, SessionOutput sessionOutput, + IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param, + long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception { // Running on NC -> send 'execute' message to CC INCServiceContext ncCtx = (INCServiceContext) serviceCtx; INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker(); @@ -83,10 +85,9 @@ if (param.timeout != null) { timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout)); } - ExecuteStatementRequestMessage requestMsg = - new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage, - statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl, - optionalParameters); + ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(), + responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery, + param.clientContextID, handleUrl, optionalParameters); outExecStartEnd[0] = System.nanoTime(); ncMb.sendMessageToCC(requestMsg); try { @@ -148,7 +149,8 @@ @Override protected HttpResponseStatus handleExecuteStatementException(Throwable t) { - if (t instanceof IPCException || t instanceof TimeoutException) { + if (t instanceof TimeoutException + || (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) { GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t); return HttpResponseStatus.SERVICE_UNAVAILABLE; } else { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 901aff8..cce5099 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -61,7 +61,6 @@ IHyracksDataset hds = getHyracksDataset(); ResultReader resultReader = new ResultReader(hds, handle.getJobId(), handle.getResultSetId()); - try { DatasetJobRecord.Status status = resultReader.getStatus(); @@ -98,7 +97,7 @@ ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null); } catch (HyracksDataException e) { final int errorCode = e.getErrorCode(); - if (ErrorCode.NO_RESULTSET == errorCode) { + if (ErrorCode.NO_RESULT_SET == errorCode) { LOGGER.log(Level.INFO, "No results for: \"" + strHandle + "\""); response.setStatus(HttpResponseStatus.NOT_FOUND); return; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index eedc8ec..3e0f2c6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -76,10 +76,14 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.client.ClusterControllerInfo; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; +import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -119,14 +123,11 @@ private IBufferCache bufferCache; private ITransactionSubsystem txnSubsystem; private IMetadataNode metadataNodeStub; - private ILSMIOOperationScheduler lsmIOScheduler; private PersistentLocalResourceRepository localResourceRepository; private IIOManager ioManager; private boolean isShuttingdown; - private ActiveManager activeManager; - private IReplicationChannel replicationChannel; private IReplicationManager replicationManager; private IRemoteRecoveryManager remoteRecoveryManager; @@ -134,6 +135,7 @@ private final ILibraryManager libraryManager; private final NCExtensionManager ncExtensionManager; private final IStorageComponentProvider componentProvider; + private IHyracksClientConnection hcc; public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions) throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException, @@ -485,4 +487,22 @@ public INCServiceContext getServiceContext() { return ncServiceContext; } + + @Override + public IHyracksClientConnection getHcc() throws HyracksDataException { + if (hcc == null || !hcc.isConnected()) { + synchronized (this) { + if (hcc == null || !hcc.isConnected()) { + try { + NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService(); + ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo(); + hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + } + } + return hcc; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index e8f63b4..3d7f870 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -48,9 +48,6 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; -import org.apache.hyracks.api.client.ClusterControllerInfo; -import org.apache.hyracks.api.client.HyracksConnection; -import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; @@ -81,7 +78,7 @@ @Override public void init(IServiceContext serviceCtx) throws Exception { - this.ncServiceCtx = (INCServiceContext) serviceCtx; + ncServiceCtx = (INCServiceContext) serviceCtx; ncServiceCtx.setThreadFactory( new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager())); } @@ -103,7 +100,7 @@ System.setProperty("java.rmi.server.hostname", (controllerService).getConfiguration().getClusterPublicAddress()); } - runtimeContext = new NCAppRuntimeContext(this.ncServiceCtx, getExtensions()); + runtimeContext = new NCAppRuntimeContext(ncServiceCtx, getExtensions()); MetadataProperties metadataProperties = runtimeContext.getMetadataProperties(); if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) { if (LOGGER.isLoggable(Level.INFO)) { @@ -115,8 +112,8 @@ MessagingProperties messagingProperties = runtimeContext.getMessagingProperties(); IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties); this.ncServiceCtx.setMessageBroker(messageBroker); - MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory( - (NCMessageBroker) messageBroker, messagingProperties); + MessagingChannelInterfaceFactory interfaceFactory = + new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties); this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory); IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); @@ -228,8 +225,8 @@ String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository()) .getStorageMountingPoints(); for (String ioDevice : ioDevices) { - String tempDatasetsDir = ioDevice + storageDirName + File.separator - + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER; + String tempDatasetsDir = + ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER; File tmpDsDir = new File(tempDatasetsDir); if (tmpDsDir.exists()) { IoUtil.delete(tmpDsDir); @@ -306,11 +303,5 @@ int ioDeviceIndex = Math.abs(StoragePathUtil.getPartitionNumFromRelativePath(relPath) % devices.size()); return devices.get(ioDeviceIndex); }; - } - - protected IHyracksClientConnection getHcc() throws Exception { - NodeControllerService ncSrv = (NodeControllerService) ncServiceCtx.getControllerService(); - ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo(); - return new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort()); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java index e1840d3..4e30d54 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java @@ -30,6 +30,8 @@ import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.library.ILibraryManager; import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IApplicationContext { @@ -56,7 +58,17 @@ /** * @return the library manager which implements {@link org.apache.asterix.common.library.ILibraryManager} */ - public ILibraryManager getLibraryManager(); + ILibraryManager getLibraryManager(); + /** + * @return the service context + */ IServiceContext getServiceContext(); + + /** + * @return a connected instance of {@link IHyracksClientConnection} + * @throws HyracksDataException + * if connection couldn't be established to cluster controller + */ + IHyracksClientConnection getHcc() throws HyracksDataException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 20f685a..690d1fd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -27,7 +27,6 @@ import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; -import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.storage.common.IStorageManager; @@ -69,11 +68,6 @@ * @return the active notification handler at Cluster controller */ IJobLifecycleListener getActiveNotificationHandler(); - - /** - * @return a new instance of {@link IHyracksClientConnection} - */ - IHyracksClientConnection getHcc(); /** * @return the cluster wide resource id manager diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index e4cc7f4..855031e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -45,7 +45,9 @@ import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.storage.common.IStorageManager; @@ -155,7 +157,18 @@ } @Override - public IHyracksClientConnection getHcc() { + public IHyracksClientConnection getHcc() throws HyracksDataException { + if (!hcc.isConnected()) { + synchronized (this) { + if (!hcc.isConnected()) { + try { + hcc = new HyracksConnection(hcc.getHost(), hcc.getPort()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + } + } return hcc; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index 0142c7d..0ded84f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -70,8 +70,8 @@ @Override public void cancelJob(JobId jobId) throws Exception { - HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction( - jobId); + HyracksClientInterfaceFunctions.CancelJobFunction cjf = + new HyracksClientInterfaceFunctions.CancelJobFunction(jobId); rpci.call(ipcHandle, cjf); } @@ -84,8 +84,8 @@ @Override public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception { - HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction( - deploymentId, acggfBytes, jobFlags); + HyracksClientInterfaceFunctions.StartJobFunction sjf = + new HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, jobFlags); return (JobId) rpci.call(ipcHandle, sjf); } @@ -165,8 +165,8 @@ } } if (ipcHandle.isConnected()) { - throw new IPCException("CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS - + " seconds"); + throw new IPCException( + "CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds"); } } @@ -181,6 +181,11 @@ public String getThreadDump(String node) throws Exception { HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = new HyracksClientInterfaceFunctions.ThreadDumpFunction(node); - return (String)rpci.call(ipcHandle, tdf); + return (String) rpci.call(ipcHandle, tdf); + } + + @Override + public boolean isConnected() { + return ipcHandle.isConnected(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 75cbf61..e979da6 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -44,7 +44,6 @@ import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.topology.ClusterTopology; import org.apache.hyracks.api.util.JavaSerializationUtils; -import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.RPCInterface; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; @@ -57,6 +56,8 @@ */ public final class HyracksConnection implements IHyracksClientConnection { private final String ccHost; + + private final int ccPort; private final IPCSystem ipc; @@ -77,11 +78,11 @@ */ public HyracksConnection(String ccHost, int ccPort) throws Exception { this.ccHost = ccHost; + this.ccPort = ccPort; RPCInterface rpci = new RPCInterface(); ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); ipc.start(); - IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort)); - this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci); + hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new InetSocketAddress(ccHost, ccPort)), rpci); ccInfo = hci.getClusterControllerInfo(); } @@ -124,6 +125,7 @@ return hci.startJob(jobId); } + @Override public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception { return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); } @@ -132,6 +134,7 @@ return hci.distributeJob(JavaSerializationUtils.serialize(acggf)); } + @Override public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception { return hci.getDatasetDirectoryServiceInfo(); } @@ -242,4 +245,19 @@ public String getThreadDump(String node) throws Exception { return hci.getThreadDump(node); } + + @Override + public String getHost() { + return ccHost; + } + + @Override + public int getPort() { + return ccPort; + } + + @Override + public boolean isConnected() { + return hci.isConnected(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index 0956d85..0189135 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -44,7 +44,7 @@ * @return {@link JobStatus} * @throws Exception */ - public JobStatus getJobStatus(JobId jobId) throws Exception; + JobStatus getJobStatus(JobId jobId) throws Exception; /** * Gets detailed information about the specified Job. @@ -54,7 +54,7 @@ * @return {@link JobStatus} * @throws Exception */ - public JobInfo getJobInfo(JobId jobId) throws Exception; + JobInfo getJobInfo(JobId jobId) throws Exception; /** * Cancel the job that has the given job id. @@ -63,7 +63,7 @@ * the JobId of the Job * @throws Exception */ - public void cancelJob(JobId jobId) throws Exception; + void cancelJob(JobId jobId) throws Exception; /** * Start the specified Job. @@ -72,7 +72,7 @@ * Job Specification * @throws Exception */ - public JobId startJob(JobSpecification jobSpec) throws Exception; + JobId startJob(JobSpecification jobSpec) throws Exception; /** * Start the specified Job. @@ -83,7 +83,7 @@ * Flags * @throws Exception */ - public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception; + JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception; /** * Distribute the specified Job. @@ -94,7 +94,7 @@ * Flags * @throws Exception */ - public JobId distributeJob(JobSpecification jobSpec) throws Exception; + JobId distributeJob(JobSpecification jobSpec) throws Exception; /** * Destroy the distributed graph for a pre-distributed job @@ -103,7 +103,7 @@ * The id of the predistributed job * @throws Exception */ - public JobId destroyJob(JobId jobId) throws Exception; + JobId destroyJob(JobId jobId) throws Exception; /** * Used to run a pre-distributed job by id (the same JobId will be returned) @@ -112,7 +112,7 @@ * The id of the predistributed job * @throws Exception */ - public JobId startJob(JobId jobId) throws Exception; + JobId startJob(JobId jobId) throws Exception; /** * Start the specified Job. @@ -123,7 +123,7 @@ * Flags * @throws Exception */ - public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception; + JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception; /** * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress @@ -131,7 +131,7 @@ * @return {@link NetworkAddress} * @throws Exception */ - public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; + NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; /** * Waits until the specified job has completed, either successfully or has @@ -141,8 +141,7 @@ * JobId of the Job * @throws Exception */ - public void waitForCompletion(JobId jobId) throws Exception; - + void waitForCompletion(JobId jobId) throws Exception; /** * Deploy the user-defined jars to the cluster @@ -150,7 +149,7 @@ * @param jars * a list of user-defined jars */ - public DeploymentId deployBinary(List<String> jars) throws Exception; + DeploymentId deployBinary(List<String> jars) throws Exception; /** * undeploy a certain deployment @@ -158,7 +157,7 @@ * @param deploymentId * the id for the deployment to be undeployed */ - public void unDeployBinary(DeploymentId deploymentId) throws Exception; + void unDeployBinary(DeploymentId deploymentId) throws Exception; /** * Start the specified Job. @@ -169,7 +168,7 @@ * Job Specification * @throws Exception */ - public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception; + JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception; /** * Start the specified Job. @@ -182,8 +181,7 @@ * Flags * @throws Exception */ - public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) - throws Exception; + JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception; /** * Start the specified Job. @@ -196,27 +194,45 @@ * Flags * @throws Exception */ - public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, - EnumSet<JobFlag> jobFlags) throws Exception; + JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) + throws Exception; /** * Shuts down all NCs and then the CC. + * * @param terminateNCService */ - public void stopCluster(boolean terminateNCService) throws Exception; + void stopCluster(boolean terminateNCService) throws Exception; /** * Get details of specified node as JSON object + * * @param nodeId - * id the subject node + * id the subject node * @param includeStats - * @param includeConfig @return serialized JSON containing the node details + * @param includeConfig + * @return serialized JSON containing the node details * @throws Exception */ - public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception; + String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception; /** * Gets thread dump from the specified node as a serialized JSON string */ - public String getThreadDump(String node) throws Exception; + String getThreadDump(String node) throws Exception; + + /** + * @return true if the connection is alive, false otherwise + */ + boolean isConnected(); + + /** + * @return the hostname of the cluster controller + */ + String getHost(); + + /** + * @return the port of the cluster controller + */ + int getPort(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 1afbe9e..9cebd3e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -67,4 +67,6 @@ public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception; public String getThreadDump(String node) throws Exception; + + public boolean isConnected(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 68e7cd1..4bb2869 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -57,7 +57,7 @@ public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21; public static final int DUPLICATE_DISTRIBUTED_JOB = 22; public static final int DISTRIBUTED_JOB_FAILURE = 23; - public static final int NO_RESULTSET = 24; + public static final int NO_RESULT_SET = 24; public static final int JOB_CANCELED = 25; public static final int NODE_FAILED = 26; public static final int FILE_IS_NOT_DIRECTORY = 27; diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java index fdac7f1..31fd379 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java @@ -93,7 +93,7 @@ try { return datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId); } catch (HyracksDataException e) { - if (e.getErrorCode() != ErrorCode.NO_RESULTSET) { + if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) { LOGGER.log(Level.WARNING, "Exception retrieving result set for job " + jobId, e); } } catch (Exception e) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java index 8400a59..7be6524 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java @@ -28,12 +28,12 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.resource.NodeCapacity; -import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.heartbeat.HeartbeatData; import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema; import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo; +import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -41,7 +41,7 @@ public class NodeControllerState { private static final int RRD_SIZE = 720; - private final INodeController nodeController; + private final NodeControllerRemoteProxy nodeController; private final NCConfig ncConfig; @@ -145,7 +145,7 @@ private NodeCapacity capacity; - public NodeControllerState(INodeController nodeController, NodeRegistration reg) { + public NodeControllerState(NodeControllerRemoteProxy nodeController, NodeRegistration reg) { this.nodeController = nodeController; ncConfig = reg.getNCConfig(); dataPort = reg.getDataPort(); @@ -251,7 +251,7 @@ return lastHeartbeatDuration++; } - public INodeController getNodeController() { + public NodeControllerRemoteProxy getNodeController() { return nodeController; } @@ -279,7 +279,7 @@ return capacity; } - public synchronized ObjectNode toSummaryJSON() { + public synchronized ObjectNode toSummaryJSON() { ObjectMapper om = new ObjectMapper(); ObjectNode o = om.createObjectNode(); o.put("node-id", ncConfig.getNodeId()); @@ -289,7 +289,7 @@ return o; } - public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) { + public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) { ObjectMapper om = new ObjectMapper(); ObjectNode o = om.createObjectNode(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 8cca1e0..2d43d42 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -35,6 +35,7 @@ import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.resource.NodeCapacity; @@ -44,6 +45,9 @@ import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.scheduler.IResourceManager; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.exceptions.IPCException; public class NodeManager implements INodeManager { private static final Logger LOGGER = Logger.getLogger(NodeManager.class.getName()); @@ -93,6 +97,13 @@ LOGGER.warning( "Node with name " + nodeId + " has already registered; failing the node then re-registering."); removeDeadNode(nodeId); + } else { + try { + IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress()); + ncIPCHandle.send(-1, new AbortCCJobsFunction(), null); + } catch (IPCException e) { + throw HyracksDataException.create(e); + } } LOGGER.warning("adding node to registry"); nodeRegistry.put(nodeId, ncState); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 7a9306c..ca1c91b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -107,7 +107,7 @@ private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) throws HyracksDataException { final DatasetJobRecord djr = getDatasetJobRecord(jobId); if (djr == null) { - throw HyracksDataException.create(ErrorCode.NO_RESULTSET, jobId); + throw HyracksDataException.create(ErrorCode.NO_RESULT_SET, jobId); } return djr; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index bf0846f..5866ba5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -28,7 +28,6 @@ import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.cluster.INodeManager; -import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.controllers.NodeParameters; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.ipc.CCNCFunctions; @@ -55,7 +54,8 @@ Map<IOption, Object> ncConfiguration = new HashMap<>(); try { LOGGER.log(Level.WARNING, "Registering INodeController: id = " + id); - INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress()); + NodeControllerRemoteProxy nc = + new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress()); NodeControllerState state = new NodeControllerState(nc, reg); INodeManager nodeManager = ccs.getNodeManager(); nodeManager.addNode(id, state); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java index f51dd06..7117b6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java @@ -18,14 +18,24 @@ */ package org.apache.hyracks.control.cc.work; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.common.work.NoOpCallback; public class RegisterResultPartitionLocationWork extends AbstractWork { + + private static final Logger LOGGER = Logger.getLogger(RegisterResultPartitionLocationWork.class.getName()); + private final ClusterControllerService ccs; private final JobId jobId; @@ -43,8 +53,7 @@ private final NetworkAddress networkAddress; public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, - boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress - networkAddress) { + boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) { this.ccs = ccs; this.jobId = jobId; this.rsId = rsId; @@ -58,17 +67,24 @@ @Override public void run() { try { - ccs.getDatasetDirectoryService() - .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions, - networkAddress); + ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, + partition, nPartitions, networkAddress); } catch (HyracksDataException e) { - throw new RuntimeException(e); + LOGGER.log(Level.WARNING, "Failed to register partition location", e); + // Should fail the job if exists on cc, otherwise, do nothing + JobRun jobRun = ccs.getJobManager().get(jobId); + if (jobRun != null) { + List<Exception> exceptions = new ArrayList<>(); + exceptions.add(e); + jobRun.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE); + } } } @Override public String toString() { - return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions - + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult; + return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + + nPartitions + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult + + " EmptyResult@" + emptyResult; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java index fa6580e..9b9a3b4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java @@ -27,13 +27,19 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; import org.apache.hyracks.api.job.resource.NodeCapacity; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.scheduler.IResourceManager; import org.apache.hyracks.control.cc.scheduler.ResourceManager; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.exceptions.IPCException; +import org.apache.hyracks.ipc.impl.IPCSystem; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; public class NodeManagerTest { @@ -43,9 +49,9 @@ private static final String NODE2 = "node2"; @Test - public void testNormal() throws HyracksException { + public void testNormal() throws HyracksException, IPCException { IResourceManager resourceManager = new ResourceManager(); - INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager); + INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager); NodeControllerState ncState1 = mockNodeControllerState(NODE1, false); NodeControllerState ncState2 = mockNodeControllerState(NODE2, false); @@ -68,9 +74,9 @@ } @Test - public void testException() throws HyracksException { + public void testException() throws HyracksException, IPCException { IResourceManager resourceManager = new ResourceManager(); - INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager); + INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager); NodeControllerState ncState1 = mockNodeControllerState(NODE1, true); boolean invalidNetworkAddress = false; @@ -84,6 +90,16 @@ // Verifies that the cluster is empty. verifyEmptyCluster(resourceManager, nodeManager); + } + + private ClusterControllerService mockCcs() throws IPCException { + ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class); + IPCSystem ipcSystem = Mockito.mock(IPCSystem.class); + IIPCHandle ipcHandle = Mockito.mock(IIPCHandle.class); + Mockito.when(ccs.getClusterIPC()).thenReturn(ipcSystem); + Mockito.when(ipcSystem.getHandle(Mockito.any())).thenReturn(ipcHandle); + Mockito.when(ipcSystem.getHandle(Mockito.any(), Mockito.anyInt())).thenReturn(ipcHandle); + return ccs; } @Test @@ -112,6 +128,7 @@ private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) { NodeControllerState ncState = mock(NodeControllerState.class); + NodeControllerRemoteProxy ncProxy = Mockito.mock(NodeControllerRemoteProxy.class); String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2"; NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001); NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002); @@ -123,6 +140,7 @@ NCConfig ncConfig = new NCConfig(nodeId); ncConfig.setDataPublicAddress(ipAddr); when(ncState.getNCConfig()).thenReturn(ncConfig); + Mockito.when(ncState.getNodeController()).thenReturn(ncProxy); return ncState; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index 0d46d64..251aed8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -45,8 +45,8 @@ import org.apache.hyracks.control.cc.application.CCServiceContext; import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.cc.cluster.NodeManager; -import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy; import org.apache.hyracks.control.common.logs.LogFile; import org.apache.hyracks.control.common.work.NoOpCallback; import org.junit.Assert; @@ -293,7 +293,7 @@ private INodeManager mockNodeManager() { INodeManager nodeManager = mock(NodeManager.class); NodeControllerState ncState = mock(NodeControllerState.class); - INodeController nodeController = mock(INodeController.class); + NodeControllerRemoteProxy nodeController = mock(NodeControllerRemoteProxy.class); when(nodeManager.getNodeControllerState(any())).thenReturn(ncState); when(ncState.getNodeController()).thenReturn(nodeController); return nodeManager; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index d42c4a8..4797ed7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -87,6 +87,7 @@ NODE_REGISTRATION_RESULT, START_TASKS, ABORT_TASKS, + ABORT_ALL_JOBS, CLEANUP_JOBLET, REPORT_PARTITION_AVAILABILITY, SEND_APPLICATION_MESSAGE, @@ -665,6 +666,16 @@ } } + //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110 + public static class AbortCCJobsFunction extends Function { + private static final long serialVersionUID = 1L; + + @Override + public FunctionId getFunctionId() { + return FunctionId.ABORT_ALL_JOBS; + } + } + public static class DistributeJobFunction extends Function { private static final long serialVersionUID = 1L; @@ -782,7 +793,7 @@ // read task attempt descriptors int tadSize = dis.readInt(); - List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<TaskAttemptDescriptor>(); + List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<>(); for (int i = 0; i < tadSize; i++) { TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis); taskDescriptors.add(tad); @@ -790,7 +801,7 @@ //read connector policies int cpSize = dis.readInt(); - Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(); + Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>(); for (int i = 0; i < cpSize; i++) { ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis); IConnectorPolicy policy = ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis); @@ -1362,8 +1373,8 @@ int cdid = dis.readInt(); int senderIndex = dis.readInt(); int receiverIndex = dis.readInt(); - PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, - receiverIndex); + PartitionId pid = + new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex); return pid; } @@ -1379,8 +1390,8 @@ int aid = dis.readInt(); int partition = dis.readInt(); int attempt = dis.readInt(); - TaskAttemptId taId = new TaskAttemptId( - new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt); + TaskAttemptId taId = + new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt); return taId; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java index 2815ae1..d4ccbd9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java @@ -21,13 +21,14 @@ import java.net.InetSocketAddress; import java.util.logging.Logger; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.ipc.impl.IPCSystem; public abstract class ControllerRemoteProxy { protected final IPCSystem ipc; - protected final InetSocketAddress inetSocketAddress; + private final InetSocketAddress inetSocketAddress; private final IControllerRemoteProxyIPCEventListener eventListener; private IIPCHandle ipcHandle; @@ -36,28 +37,33 @@ } protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, - IControllerRemoteProxyIPCEventListener eventListener) { + IControllerRemoteProxyIPCEventListener eventListener) { this.ipc = ipc; this.inetSocketAddress = inetSocketAddress; - this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {} : eventListener; + this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() { + } : eventListener; } - protected IIPCHandle ensureIpcHandle() throws IPCException { - final boolean first = ipcHandle == null; - if (first || !ipcHandle.isConnected()) { - if (!first) { - getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection"); - eventListener.ipcHandleDisconnected(ipcHandle); - } - ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first)); - if (ipcHandle.isConnected()) { - if (first) { - eventListener.ipcHandleConnected(ipcHandle); - } else { - getLogger().warning("ipcHandle " + ipcHandle + " restored"); - eventListener.ipcHandleRestored(ipcHandle); + protected IIPCHandle ensureIpcHandle() throws HyracksDataException { + try { + final boolean first = ipcHandle == null; + if (first || !ipcHandle.isConnected()) { + if (!first) { + getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection"); + eventListener.ipcHandleDisconnected(ipcHandle); + } + ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first)); + if (ipcHandle.isConnected()) { + if (first) { + eventListener.ipcHandleConnected(ipcHandle); + } else { + getLogger().warning("ipcHandle " + ipcHandle + " restored"); + eventListener.ipcHandleRestored(ipcHandle); + } } } + } catch (IPCException e) { + throw HyracksDataException.create(e); } return ipcHandle; } @@ -65,4 +71,8 @@ protected abstract int getRetries(boolean first); protected abstract Logger getLogger(); + + public InetSocketAddress getAddress() { + return inetSocketAddress; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index c416942..1eb1393 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -22,6 +22,7 @@ import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction; import org.apache.hyracks.control.nc.task.ShutdownTask; import org.apache.hyracks.control.nc.task.ThreadDumpTask; +import org.apache.hyracks.control.nc.work.AbortAllJobsWork; import org.apache.hyracks.control.nc.work.AbortTasksWork; import org.apache.hyracks.control.nc.work.ApplicationMessageWork; import org.apache.hyracks.control.nc.work.CleanupJobletWork; @@ -55,10 +56,9 @@ CCNCFunctions.Function fn = (CCNCFunctions.Function) payload; switch (fn.getFunctionId()) { case SEND_APPLICATION_MESSAGE: - CCNCFunctions.SendApplicationMessageFunction amf = - (CCNCFunctions.SendApplicationMessageFunction) fn; - ncs.getWorkQueue().schedule(new ApplicationMessageWork(ncs, amf.getMessage(), - amf.getDeploymentId(), amf.getNodeId())); + CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn; + ncs.getWorkQueue().schedule( + new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId())); return; case START_TASKS: CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn; @@ -69,6 +69,9 @@ CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn; ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks())); return; + case ABORT_ALL_JOBS: + ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs)); + return; case CLEANUP_JOBLET: CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn; ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, cjf.getJobId(), cjf.getStatus())); @@ -76,8 +79,8 @@ case REPORT_PARTITION_AVAILABILITY: CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn; - ncs.getWorkQueue().schedule(new ReportPartitionAvailabilityWork(ncs, - rpaf.getPartitionId(), rpaf.getNetworkAddress())); + ncs.getWorkQueue().schedule( + new ReportPartitionAvailabilityWork(ncs, rpaf.getPartitionId(), rpaf.getNetworkAddress())); return; case NODE_REGISTRATION_RESULT: CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn; @@ -92,8 +95,7 @@ case DEPLOY_BINARY: CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn; - ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), - dbf.getBinaryURLs())); + ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs())); return; case UNDEPLOY_BINARY: diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index b0a702d..f4ec6e4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -125,7 +125,7 @@ private final Map<JobId, Joblet> jobletMap; - private final Map<JobId, ActivityClusterGraph> preDistributedJobActivityClusterGraphMap; + private final Map<JobId, ActivityClusterGraph> preDistributedJobs; private ExecutorService executor; @@ -199,7 +199,7 @@ workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new Hashtable<>(); - preDistributedJobActivityClusterGraphMap = new Hashtable<>(); + preDistributedJobs = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(NodeControllerService.class.getName()), id)); @@ -418,27 +418,27 @@ } public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException { - if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) { + if (preDistributedJobs.get(jobId) != null) { throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); } - preDistributedJobActivityClusterGraphMap.put(jobId, acg); + preDistributedJobs.put(jobId, acg); } public void removeActivityClusterGraph(JobId jobId) throws HyracksException { - if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) { + if (preDistributedJobs.get(jobId) == null) { throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); } - preDistributedJobActivityClusterGraphMap.remove(jobId); + preDistributedJobs.remove(jobId); } public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException { - if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) { + if (preDistributedJobs.get(jobId) != null) { throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); } } public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException { - return preDistributedJobActivityClusterGraphMap.get(jobId); + return preDistributedJobs.get(jobId); } public NetworkManager getNetworkManager() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java similarity index 72% rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java index 4fb4bf6..56100da 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java @@ -18,23 +18,23 @@ */ package org.apache.hyracks.control.nc.work; -import java.util.Map; +import java.util.Collection; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.common.work.SynchronizableWork; import org.apache.hyracks.control.nc.Joblet; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; -public class AbortAllTasksWork extends SynchronizableWork { +public class AbortAllJobsWork extends SynchronizableWork { - private static final Logger LOGGER = Logger.getLogger(AbortAllTasksWork.class.getName()); + private static final Logger LOGGER = Logger.getLogger(AbortAllJobsWork.class.getName()); private final NodeControllerService ncs; - public AbortAllTasksWork(NodeControllerService ncs) { + public AbortAllJobsWork(NodeControllerService ncs) { this.ncs = ncs; } @@ -46,14 +46,16 @@ IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); if (dpm != null) { ncs.getDatasetPartitionManager().abortAllReaders(); + } else { + LOGGER.log(Level.WARNING, "DatasetPartitionManager is null on " + ncs.getId()); } - for (Joblet ji : ncs.getJobletMap().values()) { - Map<TaskAttemptId, Task> taskMap = ji.getTaskMap(); - for (Task task : taskMap.values()) { - if (task != null) { - task.abort(); - } + Collection<Joblet> joblets = ncs.getJobletMap().values(); + for (Joblet ji : joblets) { + Collection<Task> tasks = ji.getTaskMap().values(); + for (Task task : tasks) { + task.abort(); } + ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, ji.getJobId(), JobStatus.FAILURE)); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java index 4a01fdb..caba5f6 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.verify; import java.io.File; +import java.lang.reflect.Field; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,6 +33,8 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.cluster.INodeManager; +import org.apache.hyracks.control.cc.cluster.NodeManager; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.NodeControllerService; @@ -69,8 +72,16 @@ ccRoot.mkdir(); ccConfig.setRootDir(ccRoot.getAbsolutePath()); ClusterControllerService ccBase = new ClusterControllerService(ccConfig); + // The spying below is dangerous since it replaces the ClusterControllerService already referenced by many + // objects created in the constructor above cc = Mockito.spy(ccBase); cc.start(); + + // The following code partially fixes the problem created by the spying + INodeManager nodeManager = cc.getNodeManager(); + Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs"); + ccsInNodeManager.setAccessible(true); + ccsInNodeManager.set(nodeManager, cc); NCConfig ncConfig1 = new NCConfig(NC1_ID); ncConfig1.setClusterAddress("localhost"); @@ -79,7 +90,7 @@ ncConfig1.setDataListenAddress("127.0.0.1"); ncConfig1.setResultListenAddress("127.0.0.1"); ncConfig1.setResultSweepThreshold(5000); - ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); + ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); NodeControllerService nc1Base = new NodeControllerService(ncConfig1); nc1 = Mockito.spy(nc1Base); nc1.start(); @@ -91,7 +102,7 @@ ncConfig2.setDataListenAddress("127.0.0.1"); ncConfig2.setResultListenAddress("127.0.0.1"); ncConfig2.setResultSweepThreshold(5000); - ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); + ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); NodeControllerService nc2Base = new NodeControllerService(ncConfig2); nc2 = Mockito.spy(nc2Base); nc2.start(); diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java index aebe2f5..ba1c9a4 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java @@ -27,7 +27,7 @@ private final Map<Long, Request> reqMap; public RPCInterface() { - reqMap = new HashMap<Long, RPCInterface.Request>(); + reqMap = new HashMap<>(); } public Object call(IIPCHandle handle, Object request) throws Exception { -- To view, visit https://asterix-gerrit.ics.uci.edu/2026 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: If755b7131bdc91790ed28be66f0c61b51f28c2fa Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>