This is an automated email from the ASF dual-hosted git repository.
nvazquez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/main by this push:
new 3574d8d parallel nic adding (#5541)
3574d8d is described below
commit 3574d8d20be01b90ca09e58ab4b492604c1a8d1f
Author: dahn <[email protected]>
AuthorDate: Mon Oct 4 17:21:21 2021 +0200
parallel nic adding (#5541)
* trace nics additions
* work queue patch for network to add
* add secondary key to job
* logging improvements and naming of field(s)
* several naming corrections
* extra check if net already exists for vm
* placeholder job with secondary object
* constraint on entering the same job multiple times
* error handling/warning message
* review comments applied
Co-authored-by: Daan Hoogland <[email protected]>
Co-authored-by: Wei Zhou <[email protected]>
---
.../com/cloud/vm/VirtualMachineManagerImpl.java | 89 +++++++++++++++++-----
.../resources/META-INF/db/schema-41520to41600.sql | 3 +
.../framework/jobs/dao/VmWorkJobDao.java | 2 +
.../framework/jobs/dao/VmWorkJobDaoImpl.java | 15 ++++
.../cloudstack/framework/jobs/impl/AsyncJobVO.java | 2 +-
.../framework/jobs/impl/VmWorkJobVO.java | 24 ++++++
.../main/java/com/cloud/vm/UserVmManagerImpl.java | 21 +++--
7 files changed, 127 insertions(+), 29 deletions(-)
diff --git
a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
index 3aa8a50..5f6f883 100755
---
a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
+++
b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
+import javax.persistence.EntityExistsException;
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
import org.apache.cloudstack.annotation.AnnotationService;
@@ -3981,7 +3982,7 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
if
(jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
VmWorkJobVO placeHolder = null;
- placeHolder = createPlaceHolderWork(vm.getId());
+ placeHolder = createPlaceHolderWork(vm.getId(), network.getUuid());
try {
return orchestrateAddVmToNetwork(vm, network, requested);
} finally {
@@ -4021,10 +4022,23 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
}
}
+ /**
+ * duplicated in {@see UserVmManagerImpl} for a {@see UserVmVO}
+ */
+ private void checkIfNetworkExistsForVM(VirtualMachine virtualMachine,
Network network) {
+ List<NicVO> allNics = _nicsDao.listByVmId(virtualMachine.getId());
+ for (NicVO nic : allNics) {
+ if (nic.getNetworkId() == network.getId()) {
+ throw new CloudRuntimeException("A NIC already exists for VM:"
+ virtualMachine.getInstanceName() + " in network: " + network.getUuid());
+ }
+ }
+ }
+
private NicProfile orchestrateAddVmToNetwork(final VirtualMachine vm,
final Network network, final NicProfile requested) throws
ConcurrentOperationException, ResourceUnavailableException,
InsufficientCapacityException {
final CallContext cctx = CallContext.current();
+ checkIfNetworkExistsForVM(vm, network);
s_logger.debug("Adding vm " + vm + " to network " + network + ";
requested nic profile " + requested);
final VMInstanceVO vmVO = _vmDao.findById(vm.getId());
final ReservationContext context = new ReservationContextImpl(null,
null, cctx.getCallingUser(), cctx.getCallingAccount());
@@ -5375,7 +5389,7 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
Map<Volume, StoragePool> volumeStorageMap = dest.getStorageForDisks();
if (volumeStorageMap != null) {
for (Volume vol : volumeStorageMap.keySet()) {
-
checkConcurrentJobsPerDatastoreThreshhold(volumeStorageMap.get(vol));
+
checkConcurrentJobsPerDatastoreThreshold(volumeStorageMap.get(vol));
}
}
@@ -5540,7 +5554,7 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
return new VmJobVirtualMachineOutcome(workJob, vm.getId());
}
- private void checkConcurrentJobsPerDatastoreThreshhold(final StoragePool
destPool) {
+ private void checkConcurrentJobsPerDatastoreThreshold(final StoragePool
destPool) {
final Long threshold =
VolumeApiService.ConcurrentMigrationsThresholdPerDatastore.value();
if (threshold != null && threshold > 0) {
long count = _jobMgr.countPendingJobs("\"storageid\":\"" +
destPool.getUuid() + "\"", MigrateVMCmd.class.getName(),
MigrateVolumeCmd.class.getName(), MigrateVolumeCmdByAdmin.class.getName());
@@ -5561,7 +5575,7 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
Set<Long> uniquePoolIds = new HashSet<>(poolIds);
for (Long poolId : uniquePoolIds) {
StoragePoolVO pool = _storagePoolDao.findById(poolId);
- checkConcurrentJobsPerDatastoreThreshhold(pool);
+ checkConcurrentJobsPerDatastoreThreshold(pool);
}
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
@@ -5608,35 +5622,61 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
final List<VmWorkJobVO> pendingWorkJobs =
_workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vm.getId(),
- VmWorkAddVmToNetwork.class.getName());
+ VmWorkAddVmToNetwork.class.getName(), network.getUuid());
VmWorkJobVO workJob = null;
if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert pendingWorkJobs.size() == 1;
+ if (pendingWorkJobs.size() > 1) {
+ s_logger.warn(String.format("The number of jobs to add network
%s to vm %s are %d", network.getUuid(), vm.getInstanceName(),
pendingWorkJobs.size()));
+ }
workJob = pendingWorkJobs.get(0);
} else {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("no jobs to add network %s for vm
%s yet", network, vm));
+ }
- workJob = new VmWorkJobVO(context.getContextId());
+ workJob = createVmWorkJobToAddNetwork(vm, network, requested,
context, user, account);
+ }
+
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
- workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
+ return new VmJobVirtualMachineOutcome(workJob, vm.getId());
+ }
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
- workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
- workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
+ private VmWorkJobVO createVmWorkJobToAddNetwork(
+ VirtualMachine vm,
+ Network network,
+ NicProfile requested,
+ CallContext context,
+ User user,
+ Account account) {
+ VmWorkJobVO workJob;
+ workJob = new VmWorkJobVO(context.getContextId());
- // save work context info (there are some duplications)
- final VmWorkAddVmToNetwork workInfo = new
VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
- VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER,
network.getId(), requested);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setVmType(VirtualMachine.Type.Instance);
+ workJob.setVmInstanceId(vm.getId());
+ workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
+ workJob.setSecondaryObjectIdentifier(network.getUuid());
+
+ // save work context info (there are some duplications)
+ final VmWorkAddVmToNetwork workInfo = new
VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
+ VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER,
network.getId(), requested);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ try {
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE,
vm.getId());
+ } catch (CloudRuntimeException e) {
+ if (e.getCause() instanceof EntityExistsException) {
+ String msg = String.format("A job to add a nic for network %s
to vm %s already exists", network.getUuid(), vm.getUuid());
+ s_logger.warn(msg, e);
+ }
+ throw e;
}
-
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
-
- return new VmJobVirtualMachineOutcome(workJob, vm.getId());
+ return workJob;
}
public Outcome<VirtualMachine> removeNicFromVmThroughJobQueue(
@@ -5945,6 +5985,10 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
}
private VmWorkJobVO createPlaceHolderWork(final long instanceId) {
+ return createPlaceHolderWork(instanceId, null);
+ }
+
+ private VmWorkJobVO createPlaceHolderWork(final long instanceId, String
secondaryObjectIdentifier) {
final VmWorkJobVO workJob = new VmWorkJobVO("");
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
@@ -5956,6 +6000,9 @@ public class VirtualMachineManagerImpl extends
ManagerBase implements VirtualMac
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(instanceId);
+ if(StringUtils.isNotBlank(secondaryObjectIdentifier)) {
+ workJob.setSecondaryObjectIdentifier(secondaryObjectIdentifier);
+ }
workJob.setInitMsid(ManagementServerNode.getManagementServerId());
_workJobDao.persist(workJob);
diff --git
a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql
b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql
index 64c381e..abbf4a0 100644
--- a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql
+++ b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql
@@ -791,3 +791,6 @@ ALTER TABLE cloud.user_vm_details MODIFY value
varchar(5120) NOT NULL;
ALTER TABLE cloud_usage.usage_network DROP PRIMARY KEY, ADD PRIMARY KEY
(`account_id`,`zone_id`,`host_id`,`network_id`,`event_time_millis`);
ALTER TABLE `cloud`.`user_statistics` DROP INDEX `account_id`, ADD UNIQUE KEY
`account_id`
(`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`,
`network_id`);
ALTER TABLE `cloud_usage`.`user_statistics` DROP INDEX `account_id`, ADD
UNIQUE KEY `account_id`
(`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`,
`network_id`);
+
+ALTER TABLE `cloud`.`vm_work_job` ADD COLUMN `secondary_object` char(100)
COMMENT 'any additional item that must be checked during queueing' AFTER
`vm_instance_id`;
+ALTER TABLE cloud.vm_work_job ADD CONSTRAINT vm_work_job_step_and_objects
UNIQUE KEY (step,vm_instance_id,secondary_object);
diff --git
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
index 44e39e4..89601e6 100644
---
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
+++
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
@@ -32,6 +32,8 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO,
Long> {
List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long
instanceId, String jobCmd);
+ List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long
instanceId, String jobCmd, String secondaryObjectIdentifier);
+
void updateStep(long workJobId, Step step);
void expungeCompletedWorkJobs(Date cutDate);
diff --git
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
index e81ab1e..497f12d 100644
---
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
+++
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
@@ -67,6 +67,7 @@ public class VmWorkJobDaoImpl extends
GenericDaoBase<VmWorkJobVO, Long> implemen
PendingWorkJobByCommandSearch.and("jobStatus",
PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ);
PendingWorkJobByCommandSearch.and("vmType",
PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ);
PendingWorkJobByCommandSearch.and("vmInstanceId",
PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ);
+ PendingWorkJobByCommandSearch.and("secondaryObjectIdentifier",
PendingWorkJobByCommandSearch.entity().getSecondaryObjectIdentifier(), Op.EQ);
PendingWorkJobByCommandSearch.and("step",
PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
PendingWorkJobByCommandSearch.and("cmd",
PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
PendingWorkJobByCommandSearch.done();
@@ -120,6 +121,20 @@ public class VmWorkJobDaoImpl extends
GenericDaoBase<VmWorkJobVO, Long> implemen
}
@Override
+ public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type,
long instanceId, String jobCmd, String secondaryObjectIdentifier) {
+
+ SearchCriteria<VmWorkJobVO> sc =
PendingWorkJobByCommandSearch.create();
+ sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+ sc.setParameters("vmType", type);
+ sc.setParameters("vmInstanceId", instanceId);
+ sc.setParameters("secondaryObjectIdentifier",
secondaryObjectIdentifier);
+ sc.setParameters("cmd", jobCmd);
+
+ Filter filter = new Filter(VmWorkJobVO.class, "created", true, null,
null);
+ return this.listBy(sc, filter);
+ }
+
+ @Override
public void updateStep(long workJobId, Step step) {
VmWorkJobVO jobVo = findById(workJobId);
jobVo.setStep(step);
diff --git
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
index 9d30c2c..777fcba 100644
---
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
+++
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -384,7 +384,7 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
- sb.append("AsyncJobVO {id:").append(getId());
+ sb.append("AsyncJobVO : {id:").append(getId());
sb.append(", userId: ").append(getUserId());
sb.append(", accountId: ").append(getAccountId());
sb.append(", instanceType: ").append(getInstanceType());
diff --git
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
index ef0ac7d..a8a05d4 100644
---
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
+++
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
@@ -58,6 +58,9 @@ public class VmWorkJobVO extends AsyncJobVO {
@Column(name = "vm_instance_id")
long vmInstanceId;
+ @Column(name = "secondary_object")
+ String secondaryObjectIdentifier;
+
protected VmWorkJobVO() {
}
@@ -89,4 +92,25 @@ public class VmWorkJobVO extends AsyncJobVO {
public void setVmInstanceId(long vmInstanceId) {
this.vmInstanceId = vmInstanceId;
}
+
+ public String getSecondaryObjectIdentifier() {
+ return secondaryObjectIdentifier;
+ }
+
+ public void setSecondaryObjectIdentifier(String secondaryObjectIdentifier)
{
+ this.secondaryObjectIdentifier = secondaryObjectIdentifier;
+ }
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("VmWorkJobVO : {").
+ append(", step: ").append(getStep()).
+ append(", vmType: ").append(getVmType()).
+ append(", vmInstanceId: ").append(getVmInstanceId()).
+ append(", secondaryObjectIdentifier:
").append(getSecondaryObjectIdentifier()).
+ append(super.toString()).
+ append("}");
+ return sb.toString();
+ }
+
}
diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
index d371e58..4249692 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
@@ -1384,12 +1384,7 @@ public class UserVmManagerImpl extends ManagerBase
implements UserVmManager, Vir
Account vmOwner = _accountMgr.getAccount(vmInstance.getAccountId());
_networkModel.checkNetworkPermissions(vmOwner, network);
- List<NicVO> allNics = _nicDao.listByVmId(vmInstance.getId());
- for (NicVO nic : allNics) {
- if (nic.getNetworkId() == network.getId()) {
- throw new CloudRuntimeException("A NIC already exists for VM:"
+ vmInstance.getInstanceName() + " in network: " + network.getUuid());
- }
- }
+ checkIfNetExistsForVM(vmInstance, network);
macAddress = validateOrReplaceMacAddress(macAddress, network.getId());
@@ -1456,11 +1451,23 @@ public class UserVmManagerImpl extends ManagerBase
implements UserVmManager, Vir
}
}
CallContext.current().putContextParameter(Nic.class,
guestNic.getUuid());
- s_logger.debug("Successful addition of " + network + " from " +
vmInstance);
+ s_logger.debug(String.format("Successful addition of %s from %s
through %s", network, vmInstance, guestNic));
return _vmDao.findById(vmInstance.getId());
}
/**
+ * duplicated in {@see VirtualMachineManagerImpl} for a {@see VMInstanceVO}
+ */
+ private void checkIfNetExistsForVM(VirtualMachine virtualMachine, Network
network) {
+ List<NicVO> allNics = _nicDao.listByVmId(virtualMachine.getId());
+ for (NicVO nic : allNics) {
+ if (nic.getNetworkId() == network.getId()) {
+ throw new CloudRuntimeException("A NIC already exists for VM:"
+ virtualMachine.getInstanceName() + " in network: " + network.getUuid());
+ }
+ }
+ }
+
+ /**
* If the given MAC address is invalid it replaces the given MAC with the
next available MAC address
*/
protected String validateOrReplaceMacAddress(String macAddress, long
networkId) {