Updated Branches:
  refs/heads/vmsync 444a15b61 -> 006b2f8ed

add support to job joining


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/006b2f8e
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/006b2f8e
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/006b2f8e

Branch: refs/heads/vmsync
Commit: 006b2f8ed8bdb6c1b669a95379911017ff0d9a39
Parents: 444a15b
Author: Kelven Yang <kelv...@gmail.com>
Authored: Tue May 7 13:26:37 2013 -0700
Committer: Kelven Yang <kelv...@gmail.com>
Committed: Tue May 7 13:26:37 2013 -0700

----------------------------------------------------------------------
 core/src/com/cloud/async/AsyncJobJoinMapVO.java    |  141 +++++++++++++++
 .../com/cloud/async/dao/AsyncJobJoinMapDao.java    |   27 +++
 .../cloud/async/dao/AsyncJobJoinMapDaoImpl.java    |   92 ++++++++++
 .../com/cloud/async/AsyncJobTestConfiguration.java |    7 +
 .../cloud/async/SyncQueueTestConfiguration.java    |    7 +
 .../test/com/cloud/async/TestAsyncJobManager.java  |   32 ++++
 setup/db/db/schema-410to420.sql                    |   20 ++
 7 files changed, 326 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/core/src/com/cloud/async/AsyncJobJoinMapVO.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/async/AsyncJobJoinMapVO.java 
b/core/src/com/cloud/async/AsyncJobJoinMapVO.java
new file mode 100644
index 0000000..f65d7cf
--- /dev/null
+++ b/core/src/com/cloud/async/AsyncJobJoinMapVO.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.async;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name="async_job_join_map")
+public class AsyncJobJoinMapVO {
+       @Id
+    @GeneratedValue(strategy=GenerationType.IDENTITY)
+    @Column(name="id")
+    private Long id = null;
+
+    @Column(name="job_id")
+       private long jobId;
+    
+    @Column(name="join_job_id")
+       private long joinJobId;
+
+    @Column(name="join_status")
+    private int joinStatus;
+    
+    @Column(name="join_result", length=1024)
+    private String joinResult;
+
+    @Column(name="join_msid")
+    private long joinMsid;
+
+    @Column(name="complete_msid")
+    private Long completeMsid;
+
+    @Column(name=GenericDao.CREATED_COLUMN)
+    private Date created;
+    
+    @Column(name="last_updated")
+    @Temporal(TemporalType.TIMESTAMP)
+    private Date lastUpdated;
+
+    public AsyncJobJoinMapVO() {
+       created = DateUtil.currentGMTTime();
+    }
+    
+       public Long getId() {
+               return id;
+       }
+
+       public void setId(Long id) {
+               this.id = id;
+       }
+
+       public long getJobId() {
+               return jobId;
+       }
+
+       public void setJobId(long jobId) {
+               this.jobId = jobId;
+       }
+
+       public long getJoinJobId() {
+               return joinJobId;
+       }
+
+       public void setJoinJobId(long joinJobId) {
+               this.joinJobId = joinJobId;
+       }
+
+       public int getJoinStatus() {
+               return joinStatus;
+       }
+
+       public void setJoinStatus(int joinStatus) {
+               this.joinStatus = joinStatus;
+       }
+
+       public String getJoinResult() {
+               return joinResult;
+       }
+
+       public void setJoinResult(String joinResult) {
+               this.joinResult = joinResult;
+       }
+
+       public long getJoinMsid() {
+               return joinMsid;
+       }
+
+       public void setJoinMsid(long joinMsid) {
+               this.joinMsid = joinMsid;
+       }
+
+       public Long getCompleteMsid() {
+               return completeMsid;
+       }
+
+       public void setCompleteMsid(Long completeMsid) {
+               this.completeMsid = completeMsid;
+       }
+
+       public Date getCreated() {
+               return created;
+       }
+
+       public void setCreated(Date created) {
+               this.created = created;
+       }
+
+       public Date getLastUpdated() {
+               return lastUpdated;
+       }
+
+       public void setLastUpdated(Date lastUpdated) {
+               this.lastUpdated = lastUpdated;
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java 
b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
new file mode 100644
index 0000000..c4e1105
--- /dev/null
+++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.async.dao;
+
+import com.cloud.async.AsyncJobJoinMapVO;
+import com.cloud.utils.db.GenericDao;
+
+public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, 
Long> {
+       Long joinJob(long jobId, long joinJobId, long joinMsid);
+       void disjoinJob(long jobId, long joinedJobId);
+       AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId);
+       void completeJoin(long joinJobId, int joinStatus, String joinResult, 
long completeMsid);
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java 
b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
new file mode 100644
index 0000000..0da2968
--- /dev/null
+++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.async.dao;
+
+import java.util.List;
+
+import com.cloud.async.AsyncJobJoinMapVO;
+import com.cloud.async.AsyncJobResult;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.UpdateBuilder;
+import com.cloud.utils.db.SearchCriteria.Op;
+
+public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, 
Long> implements AsyncJobJoinMapDao {
+       
+       private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch;    
+       private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch;      
+       
+       public AsyncJobJoinMapDaoImpl() {
+               RecordSearch = createSearchBuilder();
+               RecordSearch.and("jobId", RecordSearch.entity().getJobId(), 
Op.EQ);
+               RecordSearch.and("joinJobId", 
RecordSearch.entity().getJoinJobId(), Op.EQ);
+               RecordSearch.done();
+               
+               CompleteJoinSearch = createSearchBuilder();
+               CompleteJoinSearch.and("joinJobId", 
CompleteJoinSearch.entity().getJoinJobId(), Op.EQ);
+               CompleteJoinSearch.done();
+       }
+       
+       public Long joinJob(long jobId, long joinJobId, long joinMsid) {
+               AsyncJobJoinMapVO record = new AsyncJobJoinMapVO();
+               record.setJobId(jobId);
+               record.setJoinJobId(joinJobId);
+               record.setJoinMsid(joinMsid);
+               record.setJoinStatus(AsyncJobResult.STATUS_IN_PROGRESS);
+               
+               this.persist(record);
+               return record.getId();
+       }
+       
+       public void disjoinJob(long jobId, long joinedJobId) {
+               SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
+               sc.setParameters("jobId", jobId);
+               sc.setParameters("joinJobId", joinedJobId);
+               
+               this.expunge(sc);
+       }
+       
+       public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) {
+               SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create();
+               sc.setParameters("jobId", jobId);
+               sc.setParameters("joinJobId", joinJobId);
+               
+               List<AsyncJobJoinMapVO> result = this.listBy(sc);
+               if(result != null && result.size() > 0) {
+                       assert(result.size() == 1);
+                       return result.get(0);
+               }
+               
+               return null;
+       }
+       
+       public void completeJoin(long joinJobId, int joinStatus, String 
joinResult, long completeMsid) {
+        AsyncJobJoinMapVO record = createForUpdate();
+        record.setJoinStatus(joinStatus);
+        record.setJoinResult(joinResult);
+        record.setCompleteMsid(completeMsid);
+        record.setLastUpdated(DateUtil.currentGMTTime());
+        
+        UpdateBuilder ub = getUpdateBuilder(record);
+        
+        SearchCriteria<AsyncJobJoinMapVO> sc = CompleteJoinSearch.create();
+        sc.setParameters("joinJobId", joinJobId);
+        update(ub, sc, null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/test/com/cloud/async/AsyncJobTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/AsyncJobTestConfiguration.java 
b/server/test/com/cloud/async/AsyncJobTestConfiguration.java
index 97345dd..12ee2f8 100644
--- a/server/test/com/cloud/async/AsyncJobTestConfiguration.java
+++ b/server/test/com/cloud/async/AsyncJobTestConfiguration.java
@@ -23,6 +23,8 @@ import org.springframework.context.annotation.Configuration;
 import com.cloud.api.ApiDispatcher;
 import com.cloud.async.dao.AsyncJobDao;
 import com.cloud.async.dao.AsyncJobDaoImpl;
+import com.cloud.async.dao.AsyncJobJoinMapDao;
+import com.cloud.async.dao.AsyncJobJoinMapDaoImpl;
 import com.cloud.async.dao.AsyncJobJournalDao;
 import com.cloud.async.dao.AsyncJobJournalDaoImpl;
 import com.cloud.async.dao.SyncQueueDao;
@@ -105,4 +107,9 @@ public class AsyncJobTestConfiguration {
        public AsyncJobJournalDao asyncJobJournalDao() {
                return new AsyncJobJournalDaoImpl();
        }
+       
+       @Bean
+       public AsyncJobJoinMapDao asyncJobJoinMapDao() {
+               return new AsyncJobJoinMapDaoImpl();
+       }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/test/com/cloud/async/SyncQueueTestConfiguration.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/SyncQueueTestConfiguration.java 
b/server/test/com/cloud/async/SyncQueueTestConfiguration.java
index 114fe39..424a3d6 100644
--- a/server/test/com/cloud/async/SyncQueueTestConfiguration.java
+++ b/server/test/com/cloud/async/SyncQueueTestConfiguration.java
@@ -20,6 +20,8 @@ import org.mockito.Mockito;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import com.cloud.async.dao.AsyncJobJoinMapDao;
+import com.cloud.async.dao.AsyncJobJoinMapDaoImpl;
 import com.cloud.async.dao.SyncQueueDao;
 import com.cloud.async.dao.SyncQueueDaoImpl;
 import com.cloud.async.dao.SyncQueueItemDao;
@@ -54,5 +56,10 @@ public class SyncQueueTestConfiguration {
        public ConfigurationManager configurationManager() {
                return Mockito.mock(ConfigurationManager.class);
        }
+       
+       @Bean
+       public AsyncJobJoinMapDao joinMapDao() {
+               return new AsyncJobJoinMapDaoImpl();
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/test/com/cloud/async/TestAsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java 
b/server/test/com/cloud/async/TestAsyncJobManager.java
index 1b78557..5f99750 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -38,6 +38,7 @@ import 
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 import com.cloud.async.AsyncJobJournalVO;
 import com.cloud.async.AsyncJobManager;
 import com.cloud.async.AsyncJobMonitor;
+import com.cloud.async.dao.AsyncJobJoinMapDao;
 import com.cloud.async.dao.AsyncJobJournalDao;
 import com.cloud.cluster.ClusterManager;
 import com.cloud.utils.Predicate;
@@ -53,6 +54,7 @@ public class TestAsyncJobManager extends TestCase {
     @Inject MessageBus messageBus;
     @Inject AsyncJobMonitor jobMonitor;
     @Inject AsyncJobJournalDao journalDao;
+    @Inject AsyncJobJoinMapDao joinMapDao;
     
     @Before                                                  
     public void setUp() {
@@ -111,6 +113,36 @@ public class TestAsyncJobManager extends TestCase {
     }
     
     @Test
+    public void testJoinMapDao() {
+       joinMapDao.joinJob(2, 1, 100);
+       joinMapDao.joinJob(3, 1, 100);
+  
+       AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1);
+       Assert.assertTrue(record != null);
+       Assert.assertTrue(record.getJoinMsid() == 100);
+       Assert.assertTrue(record.getJoinStatus() == 
AsyncJobResult.STATUS_IN_PROGRESS);
+       
+       joinMapDao.completeJoin(1, AsyncJobResult.STATUS_SUCCEEDED, "Done", 
101);
+       
+       record = joinMapDao.getJoinRecord(2, 1);
+       Assert.assertTrue(record != null);
+       Assert.assertTrue(record.getJoinMsid() == 100);
+       Assert.assertTrue(record.getJoinStatus() == 
AsyncJobResult.STATUS_SUCCEEDED);
+       Assert.assertTrue(record.getJoinResult().equals("Done"));
+       Assert.assertTrue(record.getCompleteMsid() == 101);
+       
+       record = joinMapDao.getJoinRecord(3, 1);
+       Assert.assertTrue(record != null);
+       Assert.assertTrue(record.getJoinMsid() == 100);
+       Assert.assertTrue(record.getJoinStatus() == 
AsyncJobResult.STATUS_SUCCEEDED);
+       Assert.assertTrue(record.getJoinResult().equals("Done"));
+       Assert.assertTrue(record.getCompleteMsid() == 101);
+       
+       joinMapDao.disjoinJob(2, 1);
+       joinMapDao.disjoinJob(3, 1);
+    }
+    
+    @Test
     public void testWaitAndCheck() {
                Thread thread = new Thread(new Runnable() {
                        @Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/setup/db/db/schema-410to420.sql
----------------------------------------------------------------------
diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql
index c615c4f..87f1cc7 100644
--- a/setup/db/db/schema-410to420.sql
+++ b/setup/db/db/schema-410to420.sql
@@ -442,3 +442,23 @@ CREATE TABLE `cloud`.`async_job_journal` (
   PRIMARY KEY (`id`),
   CONSTRAINT `fk_async_job_journal__job_id` FOREIGN KEY (`job_id`) REFERENCES 
`async_job`(`id`) ON DELETE CASCADE
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+CREATE TABLE `cloud`.`async_job_join_map` (
+  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
+  `job_id` bigint unsigned NOT NULL,
+  `join_job_id` bigint unsigned NOT NULL,
+  `join_status` int NOT NULL,
+  `join_result` varchar(1024),
+  `join_msid` bigint,
+  `complete_msid` bigint,
+  `created` datetime NOT NULL,
+  `last_updated` datetime,
+  PRIMARY KEY (`id`),
+  CONSTRAINT `fk_async_job_join_map__job_id` FOREIGN KEY (`job_id`) REFERENCES 
`async_job`(`id`) ON DELETE CASCADE,
+  CONSTRAINT `fk_async_job_join_map__join_job_id` FOREIGN KEY (`join_job_id`) 
REFERENCES `async_job`(`id`),
+  CONSTRAINT `fk_async_job_join_map__join` UNIQUE (`job_id`, `join_job_id`),
+  INDEX `i_async_job_join_map__join_job_id`(`join_job_id`),
+  INDEX `i_async_job_join_map__created`(`created`),
+  INDEX `i_async_job_join_map__last_updated`(`last_updated`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+

Reply via email to