Michael Blow has submitted this change and it was merged. Change subject: Attempt to reconnect IPCHandle on connection failure ......................................................................
Attempt to reconnect IPCHandle on connection failure IPCHandles can become invalid due to network interruption or node crash/restart. Automatically retry connection in event of attempt to use disconnected handle. Change-Id: I069dcd59898021054462c8213fb623df2deec598 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1828 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java 7 files changed, 175 insertions(+), 91 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index 79033d8..dc7bad0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -54,8 +54,8 @@ CCNCFunctions.NodeRegistrationResult result; Map<IOption, Object> ncConfiguration = new HashMap<>(); try { - INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle); - NodeControllerState state = new NodeControllerState(nodeController, reg); + INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress()); + NodeControllerState state = new NodeControllerState(nc, reg); INodeManager nodeManager = ccs.getNodeManager(); nodeManager.addNode(id, state); IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 4eb1732..620033c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -1396,7 +1396,8 @@ int cdid = dis.readInt(); int senderIndex = dis.readInt(); int receiverIndex = dis.readInt(); - PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex); + PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, + receiverIndex); return pid; } @@ -1412,8 +1413,8 @@ int aid = dis.readInt(); int partition = dis.readInt(); int attempt = dis.readInt(); - TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), - partition), attempt); + TaskAttemptId taId = new TaskAttemptId( + new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt); return taId; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 83ef32b..98d258f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -18,7 +18,11 @@ */ package org.apache.hyracks.control.common.ipc; +import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*; + +import java.net.InetSocketAddress; import java.util.List; +import java.util.logging.Logger; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataflow.TaskAttemptId; @@ -33,141 +37,153 @@ import org.apache.hyracks.control.common.job.PartitionRequest; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; -import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.impl.IPCSystem; -public class ClusterControllerRemoteProxy implements IClusterController { - private final IIPCHandle ipcHandle; +public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implements IClusterController { + private static final Logger LOGGER = Logger.getLogger(ClusterControllerRemoteProxy.class.getName()); - public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) { - this.ipcHandle = ipcHandle; + private final int clusterConnectRetries; + + public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries) { + super(ipc, inetSocketAddress); + this.clusterConnectRetries = clusterConnectRetries; + } + + @Override + protected int getRetries(boolean first) { + return first ? clusterConnectRetries : 0; + } + + @Override + protected Logger getLogger() { + return LOGGER; } @Override public void registerNode(NodeRegistration reg) throws Exception { - CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg); - ipcHandle.send(-1, fn, null); + RegisterNodeFunction fn = new RegisterNodeFunction(reg); + ensureIpcHandle().send(-1, fn, null); } @Override public void unregisterNode(String nodeId) throws Exception { - CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId); - ipcHandle.send(-1, fn, null); + UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId); + ensureIpcHandle().send(-1, fn, null); } @Override public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) throws Exception { - CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId, + NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, taskId, nodeId, statistics); - ipcHandle.send(-1, fn, null); + ensureIpcHandle().send(-1, fn, null); } @Override public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions) throws Exception { - CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId, + NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, taskId, nodeId, exceptions); - ipcHandle.send(-1, fn, null); + ensureIpcHandle().send(-1, fn, null); } @Override public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception { - CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId); - ipcHandle.send(-1, fn, null); + NotifyJobletCleanupFunction fn = new NotifyJobletCleanupFunction(jobId, nodeId); + ensureIpcHandle().send(-1, fn, null); } @Override public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception { - CCNCFunctions.NotifyDeployBinaryFunction fn = new CCNCFunctions.NotifyDeployBinaryFunction(deploymentId, - nodeId, status); - ipcHandle.send(-1, fn, null); + NotifyDeployBinaryFunction fn = new NotifyDeployBinaryFunction(deploymentId, nodeId, + status); + ensureIpcHandle().send(-1, fn, null); } @Override public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception { - CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData); - ipcHandle.send(-1, fn, null); + NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData); + ensureIpcHandle().send(-1, fn, null); } @Override public void reportProfile(String id, List<JobProfile> profiles) throws Exception { - CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles); - ipcHandle.send(-1, fn, null); + ReportProfileFunction fn = new ReportProfileFunction(id, profiles); + ensureIpcHandle().send(-1, fn, null); } @Override public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception { - CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction( + RegisterPartitionProviderFunction fn = new RegisterPartitionProviderFunction( partitionDescriptor); - ipcHandle.send(-1, fn, null); + ensureIpcHandle().send(-1, fn, null); } @Override public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception { - CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction( + RegisterPartitionRequestFunction fn = new RegisterPartitionRequestFunction( partitionRequest); - ipcHandle.send(-1, fn, null); + ensureIpcHandle().send(-1, fn, null); } @Override public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception { - CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, + SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId); - ipcHandle.send(-1, fn, null); + ensureIpcHandle().send(-1, fn, null); } @Override public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, - boolean emptyResult, int partition, int nPartitions, - NetworkAddress networkAddress) throws Exception { - CCNCFunctions.RegisterResultPartitionLocationFunction fn = - new CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, orderedResult, emptyResult, - partition, nPartitions, networkAddress); - ipcHandle.send(-1, fn, null); + boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception { + RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction( + jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress); + ensureIpcHandle().send(-1, fn, null); } @Override public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception { - CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = - new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(jobId, rsId, partition); - ipcHandle.send(-1, fn, null); + ReportResultPartitionWriteCompletionFunction fn = new ReportResultPartitionWriteCompletionFunction( + jobId, rsId, partition); + ensureIpcHandle().send(-1, fn, null); } @Override public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception { - CCNCFunctions.ReportResultPartitionFailureFunction fn = - new CCNCFunctions.ReportResultPartitionFailureFunction(jobId, rsId, partition); - ipcHandle.send(-1, fn, null); + ReportResultPartitionFailureFunction fn = new ReportResultPartitionFailureFunction( + jobId, rsId, partition); + ensureIpcHandle().send(-1, fn, null); } @Override public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception { - CCNCFunctions.ReportDistributedJobFailureFunction fn = - new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, nodeId); - ipcHandle.send(-1, fn, null); + ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction( + jobId, nodeId); + ensureIpcHandle().send(-1, fn, null); } @Override public void getNodeControllerInfos() throws Exception { - ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null); + ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null); } @Override public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception { - CCNCFunctions.StateDumpResponseFunction fn = - new CCNCFunctions.StateDumpResponseFunction(nodeId, stateDumpId, state); - ipcHandle.send(-1, fn, null); + StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, stateDumpId, + state); + ensureIpcHandle().send(-1, fn, null); } + @Override - public void notifyShutdown(String nodeId) throws Exception{ - CCNCFunctions.ShutdownResponseFunction sdrf = new CCNCFunctions.ShutdownResponseFunction(nodeId); - ipcHandle.send(-1, sdrf, null); + public void notifyShutdown(String nodeId) throws Exception { + ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId); + ensureIpcHandle().send(-1, sdrf, null); } @Override public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception { - CCNCFunctions.ThreadDumpResponseFunction tdrf = - new CCNCFunctions.ThreadDumpResponseFunction(nodeId, requestId, threadDumpJSON); - ipcHandle.send(-1, tdrf, null); + ThreadDumpResponseFunction tdrf = new ThreadDumpResponseFunction(nodeId, requestId, + threadDumpJSON); + ensureIpcHandle().send(-1, tdrf, null); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java new file mode 100644 index 0000000..44b0e4a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.ipc; + +import java.net.InetSocketAddress; +import java.util.logging.Logger; + +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.exceptions.IPCException; +import org.apache.hyracks.ipc.impl.IPCSystem; + +public abstract class ControllerRemoteProxy { + protected final IPCSystem ipc; + protected final InetSocketAddress inetSocketAddress; + private IIPCHandle ipcHandle; + + protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) { + this.ipc = ipc; + this.inetSocketAddress = inetSocketAddress; + } + + protected IIPCHandle ensureIpcHandle() throws IPCException { + final boolean first = ipcHandle == null; + if (first || !ipcHandle.isConnected()) { + if (!first) { + getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection"); + } + ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first)); + if (!first && ipcHandle.isConnected()) { + getLogger().warning("ipcHandle " + ipcHandle + " restored"); + } + } + return ipcHandle; + } + + protected abstract int getRetries(boolean first); + + protected abstract Logger getLogger(); +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 2a8464e..68a5b76 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -18,10 +18,14 @@ */ package org.apache.hyracks.control.common.ipc; +import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*; + +import java.net.InetSocketAddress; import java.net.URL; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.logging.Logger; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; @@ -34,89 +38,99 @@ import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; -import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.impl.IPCSystem; -public class NodeControllerRemoteProxy implements INodeController { - private final IIPCHandle ipcHandle; +public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements INodeController { + private static final Logger LOGGER = Logger.getLogger(NodeControllerRemoteProxy.class.getName()); - public NodeControllerRemoteProxy(IIPCHandle ipcHandle) { - this.ipcHandle = ipcHandle; + public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) { + super(ipc, inetSocketAddress); + } + + @Override + protected int getRetries(boolean first) { + return 0; + } + + @Override + protected Logger getLogger() { + return LOGGER; } @Override public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags) throws Exception { - CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes, + StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags); - ipcHandle.send(-1, stf, null); + ensureIpcHandle().send(-1, stf, null); } @Override public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception { - CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks); - ipcHandle.send(-1, atf, null); + AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks); + ensureIpcHandle().send(-1, atf, null); } @Override public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception { - CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId, status); - ipcHandle.send(-1, cjf, null); + CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status); + ensureIpcHandle().send(-1, cjf, null); } @Override public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception { - CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction( + ReportPartitionAvailabilityFunction rpaf = new ReportPartitionAvailabilityFunction( pid, networkAddress); - ipcHandle.send(-1, rpaf, null); + ensureIpcHandle().send(-1, rpaf, null); } @Override public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception { - CCNCFunctions.DeployBinaryFunction rpaf = new CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs); - ipcHandle.send(-1, rpaf, null); + DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs); + ensureIpcHandle().send(-1, rpaf, null); } @Override public void undeployBinary(DeploymentId deploymentId) throws Exception { - CCNCFunctions.UnDeployBinaryFunction rpaf = new CCNCFunctions.UnDeployBinaryFunction(deploymentId); - ipcHandle.send(-1, rpaf, null); + UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId); + ensureIpcHandle().send(-1, rpaf, null); } @Override public void distributeJob(JobId jobId, byte[] planBytes) throws Exception { - CCNCFunctions.DistributeJobFunction fn = new CCNCFunctions.DistributeJobFunction(jobId, planBytes); - ipcHandle.send(-1, fn, null); + DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes); + ensureIpcHandle().send(-1, fn, null); } @Override public void destroyJob(JobId jobId) throws Exception { - CCNCFunctions.DestroyJobFunction fn = new CCNCFunctions.DestroyJobFunction(jobId); - ipcHandle.send(-1, fn, null); + DestroyJobFunction fn = new DestroyJobFunction(jobId); + ensureIpcHandle().send(-1, fn, null); } @Override public void dumpState(String stateDumpId) throws Exception { - CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId); - ipcHandle.send(-1, dsf, null); + StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId); + ensureIpcHandle().send(-1, dsf, null); } @Override public void shutdown(boolean terminateNCService) throws Exception { - CCNCFunctions.ShutdownRequestFunction sdrf = new CCNCFunctions.ShutdownRequestFunction(terminateNCService); - ipcHandle.send(-1, sdrf, null); + ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService); + ensureIpcHandle().send(-1, sdrf, null); } @Override public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception { - CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, + SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId); - ipcHandle.send(-1, fn, null); + ensureIpcHandle().send(-1, fn, null); } @Override public void takeThreadDump(String requestId) throws Exception { - CCNCFunctions.ThreadDumpRequestFunction fn = new CCNCFunctions.ThreadDumpRequestFunction(requestId); - ipcHandle.send(-1, fn, null); + ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId); + ensureIpcHandle().send(-1, fn, null); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 2fe0e27..0587a55 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -80,7 +80,6 @@ import org.apache.hyracks.control.nc.partitions.PartitionManager; import org.apache.hyracks.control.nc.resources.memory.MemoryManager; import org.apache.hyracks.control.nc.work.BuildJobProfilesWork; -import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IPCPerformanceCounters; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; @@ -279,10 +278,9 @@ if (messagingNetManager != null) { messagingNetManager.start(); } - IIPCHandle ccIPCHandle = ipc.getHandle( + this.ccs = new ClusterControllerRemoteProxy(ipc, new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()), ncConfig.getClusterConnectRetries()); - this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle); HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()]; for (int i = 0; i < gcInfos.length; ++i) { gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName()); diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index fe2bcae..9efd70e 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -99,7 +99,7 @@ while (true) { synchronized (this) { handle = ipcHandleMap.get(remoteAddress); - if (handle == null) { + if (handle == null || !handle.isConnected()) { handle = new IPCHandle(system, remoteAddress); pendingConnections.add(handle); networkThread.selector.wakeup(); -- To view, visit https://asterix-gerrit.ics.uci.edu/1828 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I069dcd59898021054462c8213fb623df2deec598 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
