Updated Branches: refs/heads/vmsync 444a15b61 -> 006b2f8ed
add support to job joining Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/006b2f8e Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/006b2f8e Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/006b2f8e Branch: refs/heads/vmsync Commit: 006b2f8ed8bdb6c1b669a95379911017ff0d9a39 Parents: 444a15b Author: Kelven Yang <kelv...@gmail.com> Authored: Tue May 7 13:26:37 2013 -0700 Committer: Kelven Yang <kelv...@gmail.com> Committed: Tue May 7 13:26:37 2013 -0700 ---------------------------------------------------------------------- core/src/com/cloud/async/AsyncJobJoinMapVO.java | 141 +++++++++++++++ .../com/cloud/async/dao/AsyncJobJoinMapDao.java | 27 +++ .../cloud/async/dao/AsyncJobJoinMapDaoImpl.java | 92 ++++++++++ .../com/cloud/async/AsyncJobTestConfiguration.java | 7 + .../cloud/async/SyncQueueTestConfiguration.java | 7 + .../test/com/cloud/async/TestAsyncJobManager.java | 32 ++++ setup/db/db/schema-410to420.sql | 20 ++ 7 files changed, 326 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/core/src/com/cloud/async/AsyncJobJoinMapVO.java ---------------------------------------------------------------------- diff --git a/core/src/com/cloud/async/AsyncJobJoinMapVO.java b/core/src/com/cloud/async/AsyncJobJoinMapVO.java new file mode 100644 index 0000000..f65d7cf --- /dev/null +++ b/core/src/com/cloud/async/AsyncJobJoinMapVO.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async; + +import java.util.Date; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.persistence.Temporal; +import javax.persistence.TemporalType; + +import com.cloud.utils.DateUtil; +import com.cloud.utils.db.GenericDao; + +@Entity +@Table(name="async_job_join_map") +public class AsyncJobJoinMapVO { + @Id + @GeneratedValue(strategy=GenerationType.IDENTITY) + @Column(name="id") + private Long id = null; + + @Column(name="job_id") + private long jobId; + + @Column(name="join_job_id") + private long joinJobId; + + @Column(name="join_status") + private int joinStatus; + + @Column(name="join_result", length=1024) + private String joinResult; + + @Column(name="join_msid") + private long joinMsid; + + @Column(name="complete_msid") + private Long completeMsid; + + @Column(name=GenericDao.CREATED_COLUMN) + private Date created; + + @Column(name="last_updated") + @Temporal(TemporalType.TIMESTAMP) + private Date lastUpdated; + + public AsyncJobJoinMapVO() { + created = DateUtil.currentGMTTime(); + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public long getJobId() { + return jobId; + } + + public void setJobId(long jobId) { + this.jobId = jobId; + } + + public long getJoinJobId() { + return joinJobId; + } + + public void setJoinJobId(long joinJobId) { + this.joinJobId = joinJobId; + } + + public int getJoinStatus() { + return joinStatus; + } + + public void setJoinStatus(int joinStatus) { + this.joinStatus = joinStatus; + } + + public String getJoinResult() { + return joinResult; + } + + public void setJoinResult(String joinResult) { + this.joinResult = joinResult; + } + + public long getJoinMsid() { + return joinMsid; + } + + public void setJoinMsid(long joinMsid) { + this.joinMsid = joinMsid; + } + + public Long getCompleteMsid() { + return completeMsid; + } + + public void setCompleteMsid(Long completeMsid) { + this.completeMsid = completeMsid; + } + + public Date getCreated() { + return created; + } + + public void setCreated(Date created) { + this.created = created; + } + + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java new file mode 100644 index 0000000..c4e1105 --- /dev/null +++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDao.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async.dao; + +import com.cloud.async.AsyncJobJoinMapVO; +import com.cloud.utils.db.GenericDao; + +public interface AsyncJobJoinMapDao extends GenericDao<AsyncJobJoinMapVO, Long> { + Long joinJob(long jobId, long joinJobId, long joinMsid); + void disjoinJob(long jobId, long joinedJobId); + AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId); + void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java new file mode 100644 index 0000000..0da2968 --- /dev/null +++ b/server/src/com/cloud/async/dao/AsyncJobJoinMapDaoImpl.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async.dao; + +import java.util.List; + +import com.cloud.async.AsyncJobJoinMapVO; +import com.cloud.async.AsyncJobResult; +import com.cloud.utils.DateUtil; +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; +import com.cloud.utils.db.UpdateBuilder; +import com.cloud.utils.db.SearchCriteria.Op; + +public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Long> implements AsyncJobJoinMapDao { + + private final SearchBuilder<AsyncJobJoinMapVO> RecordSearch; + private final SearchBuilder<AsyncJobJoinMapVO> CompleteJoinSearch; + + public AsyncJobJoinMapDaoImpl() { + RecordSearch = createSearchBuilder(); + RecordSearch.and("jobId", RecordSearch.entity().getJobId(), Op.EQ); + RecordSearch.and("joinJobId", RecordSearch.entity().getJoinJobId(), Op.EQ); + RecordSearch.done(); + + CompleteJoinSearch = createSearchBuilder(); + CompleteJoinSearch.and("joinJobId", CompleteJoinSearch.entity().getJoinJobId(), Op.EQ); + CompleteJoinSearch.done(); + } + + public Long joinJob(long jobId, long joinJobId, long joinMsid) { + AsyncJobJoinMapVO record = new AsyncJobJoinMapVO(); + record.setJobId(jobId); + record.setJoinJobId(joinJobId); + record.setJoinMsid(joinMsid); + record.setJoinStatus(AsyncJobResult.STATUS_IN_PROGRESS); + + this.persist(record); + return record.getId(); + } + + public void disjoinJob(long jobId, long joinedJobId) { + SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create(); + sc.setParameters("jobId", jobId); + sc.setParameters("joinJobId", joinedJobId); + + this.expunge(sc); + } + + public AsyncJobJoinMapVO getJoinRecord(long jobId, long joinJobId) { + SearchCriteria<AsyncJobJoinMapVO> sc = RecordSearch.create(); + sc.setParameters("jobId", jobId); + sc.setParameters("joinJobId", joinJobId); + + List<AsyncJobJoinMapVO> result = this.listBy(sc); + if(result != null && result.size() > 0) { + assert(result.size() == 1); + return result.get(0); + } + + return null; + } + + public void completeJoin(long joinJobId, int joinStatus, String joinResult, long completeMsid) { + AsyncJobJoinMapVO record = createForUpdate(); + record.setJoinStatus(joinStatus); + record.setJoinResult(joinResult); + record.setCompleteMsid(completeMsid); + record.setLastUpdated(DateUtil.currentGMTTime()); + + UpdateBuilder ub = getUpdateBuilder(record); + + SearchCriteria<AsyncJobJoinMapVO> sc = CompleteJoinSearch.create(); + sc.setParameters("joinJobId", joinJobId); + update(ub, sc, null); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/test/com/cloud/async/AsyncJobTestConfiguration.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/async/AsyncJobTestConfiguration.java b/server/test/com/cloud/async/AsyncJobTestConfiguration.java index 97345dd..12ee2f8 100644 --- a/server/test/com/cloud/async/AsyncJobTestConfiguration.java +++ b/server/test/com/cloud/async/AsyncJobTestConfiguration.java @@ -23,6 +23,8 @@ import org.springframework.context.annotation.Configuration; import com.cloud.api.ApiDispatcher; import com.cloud.async.dao.AsyncJobDao; import com.cloud.async.dao.AsyncJobDaoImpl; +import com.cloud.async.dao.AsyncJobJoinMapDao; +import com.cloud.async.dao.AsyncJobJoinMapDaoImpl; import com.cloud.async.dao.AsyncJobJournalDao; import com.cloud.async.dao.AsyncJobJournalDaoImpl; import com.cloud.async.dao.SyncQueueDao; @@ -105,4 +107,9 @@ public class AsyncJobTestConfiguration { public AsyncJobJournalDao asyncJobJournalDao() { return new AsyncJobJournalDaoImpl(); } + + @Bean + public AsyncJobJoinMapDao asyncJobJoinMapDao() { + return new AsyncJobJoinMapDaoImpl(); + } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/test/com/cloud/async/SyncQueueTestConfiguration.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/async/SyncQueueTestConfiguration.java b/server/test/com/cloud/async/SyncQueueTestConfiguration.java index 114fe39..424a3d6 100644 --- a/server/test/com/cloud/async/SyncQueueTestConfiguration.java +++ b/server/test/com/cloud/async/SyncQueueTestConfiguration.java @@ -20,6 +20,8 @@ import org.mockito.Mockito; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import com.cloud.async.dao.AsyncJobJoinMapDao; +import com.cloud.async.dao.AsyncJobJoinMapDaoImpl; import com.cloud.async.dao.SyncQueueDao; import com.cloud.async.dao.SyncQueueDaoImpl; import com.cloud.async.dao.SyncQueueItemDao; @@ -54,5 +56,10 @@ public class SyncQueueTestConfiguration { public ConfigurationManager configurationManager() { return Mockito.mock(ConfigurationManager.class); } + + @Bean + public AsyncJobJoinMapDao joinMapDao() { + return new AsyncJobJoinMapDaoImpl(); + } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/server/test/com/cloud/async/TestAsyncJobManager.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java index 1b78557..5f99750 100644 --- a/server/test/com/cloud/async/TestAsyncJobManager.java +++ b/server/test/com/cloud/async/TestAsyncJobManager.java @@ -38,6 +38,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.cloud.async.AsyncJobJournalVO; import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobMonitor; +import com.cloud.async.dao.AsyncJobJoinMapDao; import com.cloud.async.dao.AsyncJobJournalDao; import com.cloud.cluster.ClusterManager; import com.cloud.utils.Predicate; @@ -53,6 +54,7 @@ public class TestAsyncJobManager extends TestCase { @Inject MessageBus messageBus; @Inject AsyncJobMonitor jobMonitor; @Inject AsyncJobJournalDao journalDao; + @Inject AsyncJobJoinMapDao joinMapDao; @Before public void setUp() { @@ -111,6 +113,36 @@ public class TestAsyncJobManager extends TestCase { } @Test + public void testJoinMapDao() { + joinMapDao.joinJob(2, 1, 100); + joinMapDao.joinJob(3, 1, 100); + + AsyncJobJoinMapVO record = joinMapDao.getJoinRecord(2, 1); + Assert.assertTrue(record != null); + Assert.assertTrue(record.getJoinMsid() == 100); + Assert.assertTrue(record.getJoinStatus() == AsyncJobResult.STATUS_IN_PROGRESS); + + joinMapDao.completeJoin(1, AsyncJobResult.STATUS_SUCCEEDED, "Done", 101); + + record = joinMapDao.getJoinRecord(2, 1); + Assert.assertTrue(record != null); + Assert.assertTrue(record.getJoinMsid() == 100); + Assert.assertTrue(record.getJoinStatus() == AsyncJobResult.STATUS_SUCCEEDED); + Assert.assertTrue(record.getJoinResult().equals("Done")); + Assert.assertTrue(record.getCompleteMsid() == 101); + + record = joinMapDao.getJoinRecord(3, 1); + Assert.assertTrue(record != null); + Assert.assertTrue(record.getJoinMsid() == 100); + Assert.assertTrue(record.getJoinStatus() == AsyncJobResult.STATUS_SUCCEEDED); + Assert.assertTrue(record.getJoinResult().equals("Done")); + Assert.assertTrue(record.getCompleteMsid() == 101); + + joinMapDao.disjoinJob(2, 1); + joinMapDao.disjoinJob(3, 1); + } + + @Test public void testWaitAndCheck() { Thread thread = new Thread(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/cloudstack/blob/006b2f8e/setup/db/db/schema-410to420.sql ---------------------------------------------------------------------- diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql index c615c4f..87f1cc7 100644 --- a/setup/db/db/schema-410to420.sql +++ b/setup/db/db/schema-410to420.sql @@ -442,3 +442,23 @@ CREATE TABLE `cloud`.`async_job_journal` ( PRIMARY KEY (`id`), CONSTRAINT `fk_async_job_journal__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE `cloud`.`async_job_join_map` ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `job_id` bigint unsigned NOT NULL, + `join_job_id` bigint unsigned NOT NULL, + `join_status` int NOT NULL, + `join_result` varchar(1024), + `join_msid` bigint, + `complete_msid` bigint, + `created` datetime NOT NULL, + `last_updated` datetime, + PRIMARY KEY (`id`), + CONSTRAINT `fk_async_job_join_map__job_id` FOREIGN KEY (`job_id`) REFERENCES `async_job`(`id`) ON DELETE CASCADE, + CONSTRAINT `fk_async_job_join_map__join_job_id` FOREIGN KEY (`join_job_id`) REFERENCES `async_job`(`id`), + CONSTRAINT `fk_async_job_join_map__join` UNIQUE (`job_id`, `join_job_id`), + INDEX `i_async_job_join_map__join_job_id`(`join_job_id`), + INDEX `i_async_job_join_map__created`(`created`), + INDEX `i_async_job_join_map__last_updated`(`last_updated`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +