This is an automated email from the ASF dual-hosted git repository.

nvazquez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 3574d8d  parallel nic adding (#5541)
3574d8d is described below

commit 3574d8d20be01b90ca09e58ab4b492604c1a8d1f
Author: dahn <[email protected]>
AuthorDate: Mon Oct 4 17:21:21 2021 +0200

    parallel nic adding (#5541)
    
    * trace nics additions
    
    * work queue patch for network to add
    
    * add secondary key to job
    
    * logging improvements and naming of field(s)
    
    * several naming corrections
    
    * extra check if net already exists for vm
    
    * placeholder job with secondary object
    
    * constraint on entering the same job multiple times
    
    * error handling/warning message
    
    * review comments applied
    
    Co-authored-by: Daan Hoogland <[email protected]>
    Co-authored-by: Wei Zhou <[email protected]>
---
 .../com/cloud/vm/VirtualMachineManagerImpl.java    | 89 +++++++++++++++++-----
 .../resources/META-INF/db/schema-41520to41600.sql  |  3 +
 .../framework/jobs/dao/VmWorkJobDao.java           |  2 +
 .../framework/jobs/dao/VmWorkJobDaoImpl.java       | 15 ++++
 .../cloudstack/framework/jobs/impl/AsyncJobVO.java |  2 +-
 .../framework/jobs/impl/VmWorkJobVO.java           | 24 ++++++
 .../main/java/com/cloud/vm/UserVmManagerImpl.java  | 21 +++--
 7 files changed, 127 insertions(+), 29 deletions(-)

diff --git 
a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
 
b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
index 3aa8a50..5f6f883 100755
--- 
a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
+++ 
b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
+import javax.persistence.EntityExistsException;
 
 import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
 import org.apache.cloudstack.annotation.AnnotationService;
@@ -3981,7 +3982,7 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         if 
(jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
             VmWorkJobVO placeHolder = null;
-            placeHolder = createPlaceHolderWork(vm.getId());
+            placeHolder = createPlaceHolderWork(vm.getId(), network.getUuid());
             try {
                 return orchestrateAddVmToNetwork(vm, network, requested);
             } finally {
@@ -4021,10 +4022,23 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         }
     }
 
+    /**
+     * duplicated in {@see UserVmManagerImpl} for a {@see UserVmVO}
+     */
+    private void checkIfNetworkExistsForVM(VirtualMachine virtualMachine, 
Network network) {
+        List<NicVO> allNics = _nicsDao.listByVmId(virtualMachine.getId());
+        for (NicVO nic : allNics) {
+            if (nic.getNetworkId() == network.getId()) {
+                throw new CloudRuntimeException("A NIC already exists for VM:" 
+ virtualMachine.getInstanceName() + " in network: " + network.getUuid());
+            }
+        }
+    }
+
     private NicProfile orchestrateAddVmToNetwork(final VirtualMachine vm, 
final Network network, final NicProfile requested) throws 
ConcurrentOperationException, ResourceUnavailableException,
     InsufficientCapacityException {
         final CallContext cctx = CallContext.current();
 
+        checkIfNetworkExistsForVM(vm, network);
         s_logger.debug("Adding vm " + vm + " to network " + network + "; 
requested nic profile " + requested);
         final VMInstanceVO vmVO = _vmDao.findById(vm.getId());
         final ReservationContext context = new ReservationContextImpl(null, 
null, cctx.getCallingUser(), cctx.getCallingAccount());
@@ -5375,7 +5389,7 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         Map<Volume, StoragePool> volumeStorageMap = dest.getStorageForDisks();
         if (volumeStorageMap != null) {
             for (Volume vol : volumeStorageMap.keySet()) {
-                
checkConcurrentJobsPerDatastoreThreshhold(volumeStorageMap.get(vol));
+                
checkConcurrentJobsPerDatastoreThreshold(volumeStorageMap.get(vol));
             }
         }
 
@@ -5540,7 +5554,7 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         return new VmJobVirtualMachineOutcome(workJob, vm.getId());
     }
 
-    private void checkConcurrentJobsPerDatastoreThreshhold(final StoragePool 
destPool) {
+    private void checkConcurrentJobsPerDatastoreThreshold(final StoragePool 
destPool) {
         final Long threshold = 
VolumeApiService.ConcurrentMigrationsThresholdPerDatastore.value();
         if (threshold != null && threshold > 0) {
             long count = _jobMgr.countPendingJobs("\"storageid\":\"" + 
destPool.getUuid() + "\"", MigrateVMCmd.class.getName(), 
MigrateVolumeCmd.class.getName(), MigrateVolumeCmdByAdmin.class.getName());
@@ -5561,7 +5575,7 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         Set<Long> uniquePoolIds = new HashSet<>(poolIds);
         for (Long poolId : uniquePoolIds) {
             StoragePoolVO pool = _storagePoolDao.findById(poolId);
-            checkConcurrentJobsPerDatastoreThreshhold(pool);
+            checkConcurrentJobsPerDatastoreThreshold(pool);
         }
 
         final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
@@ -5608,35 +5622,61 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
 
         final List<VmWorkJobVO> pendingWorkJobs = 
_workJobDao.listPendingWorkJobs(
                 VirtualMachine.Type.Instance, vm.getId(),
-                VmWorkAddVmToNetwork.class.getName());
+                VmWorkAddVmToNetwork.class.getName(), network.getUuid());
 
         VmWorkJobVO workJob = null;
         if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
-            assert pendingWorkJobs.size() == 1;
+            if (pendingWorkJobs.size() > 1) {
+                s_logger.warn(String.format("The number of jobs to add network 
%s to vm %s are %d", network.getUuid(), vm.getInstanceName(), 
pendingWorkJobs.size()));
+            }
             workJob = pendingWorkJobs.get(0);
         } else {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace(String.format("no jobs to add network %s for vm 
%s yet", network, vm));
+            }
 
-            workJob = new VmWorkJobVO(context.getContextId());
+            workJob = createVmWorkJobToAddNetwork(vm, network, requested, 
context, user, account);
+        }
+        
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
 
-            workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
-            workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
+        return new VmJobVirtualMachineOutcome(workJob, vm.getId());
+    }
 
-            workJob.setAccountId(account.getId());
-            workJob.setUserId(user.getId());
-            workJob.setVmType(VirtualMachine.Type.Instance);
-            workJob.setVmInstanceId(vm.getId());
-            workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
+    private VmWorkJobVO createVmWorkJobToAddNetwork(
+            VirtualMachine vm,
+            Network network,
+            NicProfile requested,
+            CallContext context,
+            User user,
+            Account account) {
+        VmWorkJobVO workJob;
+        workJob = new VmWorkJobVO(context.getContextId());
 
-            // save work context info (there are some duplications)
-            final VmWorkAddVmToNetwork workInfo = new 
VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
-                    VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, 
network.getId(), requested);
-            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+        workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
 
+        workJob.setAccountId(account.getId());
+        workJob.setUserId(user.getId());
+        workJob.setVmType(VirtualMachine.Type.Instance);
+        workJob.setVmInstanceId(vm.getId());
+        workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
+        workJob.setSecondaryObjectIdentifier(network.getUuid());
+
+        // save work context info (there are some duplications)
+        final VmWorkAddVmToNetwork workInfo = new 
VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
+                VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, 
network.getId(), requested);
+        workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+        try {
             _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, 
vm.getId());
+        } catch (CloudRuntimeException e) {
+            if (e.getCause() instanceof EntityExistsException) {
+                String msg = String.format("A job to add a nic for network %s 
to vm %s already exists", network.getUuid(), vm.getUuid());
+                s_logger.warn(msg, e);
+            }
+            throw e;
         }
-        
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
-
-        return new VmJobVirtualMachineOutcome(workJob, vm.getId());
+        return workJob;
     }
 
     public Outcome<VirtualMachine> removeNicFromVmThroughJobQueue(
@@ -5945,6 +5985,10 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
     }
 
     private VmWorkJobVO createPlaceHolderWork(final long instanceId) {
+        return createPlaceHolderWork(instanceId, null);
+    }
+
+    private VmWorkJobVO createPlaceHolderWork(final long instanceId, String 
secondaryObjectIdentifier) {
         final VmWorkJobVO workJob = new VmWorkJobVO("");
 
         workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
@@ -5956,6 +6000,9 @@ public class VirtualMachineManagerImpl extends 
ManagerBase implements VirtualMac
         workJob.setStep(VmWorkJobVO.Step.Starting);
         workJob.setVmType(VirtualMachine.Type.Instance);
         workJob.setVmInstanceId(instanceId);
+        if(StringUtils.isNotBlank(secondaryObjectIdentifier)) {
+            workJob.setSecondaryObjectIdentifier(secondaryObjectIdentifier);
+        }
         workJob.setInitMsid(ManagementServerNode.getManagementServerId());
 
         _workJobDao.persist(workJob);
diff --git 
a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql 
b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql
index 64c381e..abbf4a0 100644
--- a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql
+++ b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql
@@ -791,3 +791,6 @@ ALTER TABLE cloud.user_vm_details MODIFY value 
varchar(5120) NOT NULL;
 ALTER TABLE cloud_usage.usage_network DROP PRIMARY KEY, ADD PRIMARY KEY 
(`account_id`,`zone_id`,`host_id`,`network_id`,`event_time_millis`);
 ALTER TABLE `cloud`.`user_statistics` DROP INDEX `account_id`, ADD UNIQUE KEY 
`account_id`  
(`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`, 
`network_id`);
 ALTER TABLE `cloud_usage`.`user_statistics` DROP INDEX `account_id`, ADD 
UNIQUE KEY `account_id`  
(`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`, 
`network_id`);
+
+ALTER TABLE `cloud`.`vm_work_job` ADD COLUMN `secondary_object` char(100) 
COMMENT 'any additional item that must be checked during queueing' AFTER 
`vm_instance_id`;
+ALTER TABLE cloud.vm_work_job ADD CONSTRAINT vm_work_job_step_and_objects 
UNIQUE KEY (step,vm_instance_id,secondary_object);
diff --git 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
index 44e39e4..89601e6 100644
--- 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
+++ 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
@@ -32,6 +32,8 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, 
Long> {
 
     List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long 
instanceId, String jobCmd);
 
+    List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long 
instanceId, String jobCmd, String secondaryObjectIdentifier);
+
     void updateStep(long workJobId, Step step);
 
     void expungeCompletedWorkJobs(Date cutDate);
diff --git 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
index e81ab1e..497f12d 100644
--- 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
+++ 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
@@ -67,6 +67,7 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
         PendingWorkJobByCommandSearch.and("jobStatus", 
PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ);
         PendingWorkJobByCommandSearch.and("vmType", 
PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ);
         PendingWorkJobByCommandSearch.and("vmInstanceId", 
PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ);
+        PendingWorkJobByCommandSearch.and("secondaryObjectIdentifier", 
PendingWorkJobByCommandSearch.entity().getSecondaryObjectIdentifier(), Op.EQ);
         PendingWorkJobByCommandSearch.and("step", 
PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
         PendingWorkJobByCommandSearch.and("cmd", 
PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
         PendingWorkJobByCommandSearch.done();
@@ -120,6 +121,20 @@ public class VmWorkJobDaoImpl extends 
GenericDaoBase<VmWorkJobVO, Long> implemen
     }
 
     @Override
+    public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, 
long instanceId, String jobCmd, String secondaryObjectIdentifier) {
+
+        SearchCriteria<VmWorkJobVO> sc = 
PendingWorkJobByCommandSearch.create();
+        sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+        sc.setParameters("vmType", type);
+        sc.setParameters("vmInstanceId", instanceId);
+        sc.setParameters("secondaryObjectIdentifier", 
secondaryObjectIdentifier);
+        sc.setParameters("cmd", jobCmd);
+
+        Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, 
null);
+        return this.listBy(sc, filter);
+    }
+
+    @Override
     public void updateStep(long workJobId, Step step) {
         VmWorkJobVO jobVo = findById(workJobId);
         jobVo.setStep(step);
diff --git 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
index 9d30c2c..777fcba 100644
--- 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
+++ 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java
@@ -384,7 +384,7 @@ public class AsyncJobVO implements AsyncJob, JobInfo {
     @Override
     public String toString() {
         StringBuffer sb = new StringBuffer();
-        sb.append("AsyncJobVO {id:").append(getId());
+        sb.append("AsyncJobVO : {id:").append(getId());
         sb.append(", userId: ").append(getUserId());
         sb.append(", accountId: ").append(getAccountId());
         sb.append(", instanceType: ").append(getInstanceType());
diff --git 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
index ef0ac7d..a8a05d4 100644
--- 
a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
+++ 
b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java
@@ -58,6 +58,9 @@ public class VmWorkJobVO extends AsyncJobVO {
     @Column(name = "vm_instance_id")
     long vmInstanceId;
 
+    @Column(name = "secondary_object")
+    String secondaryObjectIdentifier;
+
     protected VmWorkJobVO() {
     }
 
@@ -89,4 +92,25 @@ public class VmWorkJobVO extends AsyncJobVO {
     public void setVmInstanceId(long vmInstanceId) {
         this.vmInstanceId = vmInstanceId;
     }
+
+    public String getSecondaryObjectIdentifier() {
+        return secondaryObjectIdentifier;
+    }
+
+    public void setSecondaryObjectIdentifier(String secondaryObjectIdentifier) 
{
+        this.secondaryObjectIdentifier = secondaryObjectIdentifier;
+    }
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("VmWorkJobVO : {").
+                append(", step: ").append(getStep()).
+                append(", vmType: ").append(getVmType()).
+                append(", vmInstanceId: ").append(getVmInstanceId()).
+                append(", secondaryObjectIdentifier: 
").append(getSecondaryObjectIdentifier()).
+                append(super.toString()).
+                append("}");
+        return sb.toString();
+    }
+
 }
diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java 
b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
index d371e58..4249692 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
@@ -1384,12 +1384,7 @@ public class UserVmManagerImpl extends ManagerBase 
implements UserVmManager, Vir
         Account vmOwner = _accountMgr.getAccount(vmInstance.getAccountId());
         _networkModel.checkNetworkPermissions(vmOwner, network);
 
-        List<NicVO> allNics = _nicDao.listByVmId(vmInstance.getId());
-        for (NicVO nic : allNics) {
-            if (nic.getNetworkId() == network.getId()) {
-                throw new CloudRuntimeException("A NIC already exists for VM:" 
+ vmInstance.getInstanceName() + " in network: " + network.getUuid());
-            }
-        }
+        checkIfNetExistsForVM(vmInstance, network);
 
         macAddress = validateOrReplaceMacAddress(macAddress, network.getId());
 
@@ -1456,11 +1451,23 @@ public class UserVmManagerImpl extends ManagerBase 
implements UserVmManager, Vir
             }
         }
         CallContext.current().putContextParameter(Nic.class, 
guestNic.getUuid());
-        s_logger.debug("Successful addition of " + network + " from " + 
vmInstance);
+        s_logger.debug(String.format("Successful addition of %s from %s 
through %s", network, vmInstance, guestNic));
         return _vmDao.findById(vmInstance.getId());
     }
 
     /**
+     * duplicated in {@see VirtualMachineManagerImpl} for a {@see VMInstanceVO}
+     */
+    private void checkIfNetExistsForVM(VirtualMachine virtualMachine, Network 
network) {
+        List<NicVO> allNics = _nicDao.listByVmId(virtualMachine.getId());
+        for (NicVO nic : allNics) {
+            if (nic.getNetworkId() == network.getId()) {
+                throw new CloudRuntimeException("A NIC already exists for VM:" 
+ virtualMachine.getInstanceName() + " in network: " + network.getUuid());
+            }
+        }
+    }
+
+    /**
      * If the given MAC address is invalid it replaces the given MAC with the 
next available MAC address
      */
     protected String validateOrReplaceMacAddress(String macAddress, long 
networkId) {

Reply via email to