Author: simoneg Date: Thu Nov 27 08:32:00 2008 New Revision: 721230 URL: http://svn.apache.org/viewvc?rev=721230&view=rev Log: LABS-242 : Simple implementation of the scheduler
Added: labs/magma/trunk/jobs-simple/pom.xml labs/magma/trunk/jobs-simple/src/ labs/magma/trunk/jobs-simple/src/main/ labs/magma/trunk/jobs-simple/src/main/java/ labs/magma/trunk/jobs-simple/src/main/java/org/ labs/magma/trunk/jobs-simple/src/main/java/org/apache/ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/InstallSimpleScheduler.aj labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobData.java labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobExecutingThread.java labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/SimpleScheduler.java labs/magma/trunk/jobs-simple/src/main/resources/ labs/magma/trunk/jobs-simple/src/test/ labs/magma/trunk/jobs-simple/src/test/java/ labs/magma/trunk/jobs-simple/src/test/java/org/ labs/magma/trunk/jobs-simple/src/test/java/org/apache/ labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/ labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/ labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/ labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulerRunTest.java labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulingTest.java labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/StupidJob.java labs/magma/trunk/jobs-simple/src/test/resources/ Added: labs/magma/trunk/jobs-simple/pom.xml URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/pom.xml?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/pom.xml (added) +++ labs/magma/trunk/jobs-simple/pom.xml Thu Nov 27 08:32:00 2008 @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <parent> + <artifactId>magma-parent</artifactId> + <groupId>org.apache.magma</groupId> + <version>1</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.magma</groupId> + <artifactId>jobs-simple</artifactId> + <name>Magma Simple Jobs system</name> + <version>0.0.1-SNAPSHOT</version> + <description/> + <packaging>magma</packaging> + <dependencies> + <dependency> + <groupId>org.apache.magma</groupId> + <artifactId>foundation-jobs</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.magma</groupId> + <artifactId>foundation-conversion</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.5</version> + </dependency> + </dependencies> +</project> \ No newline at end of file Added: labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/InstallSimpleScheduler.aj URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/InstallSimpleScheduler.aj?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/InstallSimpleScheduler.aj (added) +++ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/InstallSimpleScheduler.aj Thu Nov 27 08:32:00 2008 @@ -0,0 +1,20 @@ +package org.apache.magma.jobs.simple; + +import org.apache.magma.jobs.Scheduler; + +public aspect InstallSimpleScheduler { + + private SimpleScheduler current = null; + + Scheduler around() : call(Scheduler.new()) { + if (current == null) { + current = new SimpleScheduler(); + Thread t = new Thread(current); + t.setDaemon(true); + t.setName("SimpleScheduler"); + t.start(); + } + return current; + } + +} Added: labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobData.java URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobData.java?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobData.java (added) +++ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobData.java Thu Nov 27 08:32:00 2008 @@ -0,0 +1,137 @@ +package org.apache.magma.jobs.simple; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.magma.basics.MagmaException; +import org.apache.magma.beans.BeanData; +import org.apache.magma.beans.BeanHandler; +import org.apache.magma.beans.PropertyInfo; +import org.apache.magma.jobs.Job; +import org.apache.magma.jobs.Report; +import org.apache.magma.jobs.SimpleTrigger; +import org.apache.magma.jobs.JobStatus; + +public class JobData implements JobStatus { + + private Class<? extends Job> jobClass = null; + private Map<String,Object> binding = new HashMap<String, Object>(); + private long lastExecuted = 0; + private SimpleTrigger trigger = null; + private Report lastReport = null; + private boolean running = false; + private int id = System.identityHashCode(this); + private String name = "Unknown"; + + public JobData(Job job) { + this.jobClass = job.getClass(); + BeanHandler handler = job.handler(); + BeanData data = job.beanData(); + Set<String> names = data.getPropertyNames(); + for (String propname : names) { + PropertyInfo prop = data.getProperty(propname); + if (prop.isReadable() && prop.isWriteable()) { + Object val = null; + if (handler.isConverted(propname)) { + val = handler.getStringValue(propname); + } else { + val = handler.getValue(propname); + } + binding.put(propname, val); + } + } + } + + public long getLastExecuted() { + return lastExecuted; + } + + public void setLastExecuted(long lastExecuted) { + this.lastExecuted = lastExecuted; + } + + public Date getLastRun() { + return new Date(this.lastExecuted); + } + + public SimpleTrigger getTrigger() { + return trigger; + } + + public void setTrigger(SimpleTrigger trigger) { + this.trigger = trigger; + } + + public Class<? extends Job> getJobClass() { + return jobClass; + } + + public Map<String, Object> getParameters() { + return binding; + } + + public Job createInstance() { + try { + Job job = this.jobClass.newInstance(); + BeanHandler handler = job.handler(); + BeanData data = job.beanData(); + Set<String> names = data.getPropertyNames(); + for (String propname : names) { + PropertyInfo prop = data.getProperty(propname); + if (prop.isReadable() && prop.isWriteable()) { + Object val = binding.get(propname); + if (val instanceof String) { + handler.setStringValue(propname, (String)val); + } else { + handler.setValue(propname, val); + } + } + } + handler.commit(); + return job; + } catch (Exception e) { + throw new MagmaException(e, "Error initializing and starting job of class {0}", this.jobClass); + } + } + + public boolean matches(JobData other) { + return + other.jobClass.equals(this.jobClass) && + other.trigger.getClass().equals(this.trigger.getClass()) && + other.trigger.equals(this.trigger); + } + + public long getNextExecutionTime() { + return lastExecuted + this.trigger.getEveryMillis(); + } + + public Report getReport() { + return lastReport; + } + + public void setLastReport(Report lastReport) { + this.lastReport = lastReport; + } + + public boolean isRunning() { + return running; + } + + public void setRunning(boolean running) { + this.running = running; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} Added: labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobExecutingThread.java URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobExecutingThread.java?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobExecutingThread.java (added) +++ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/JobExecutingThread.java Thu Nov 27 08:32:00 2008 @@ -0,0 +1,59 @@ +package org.apache.magma.jobs.simple; + +import org.apache.magma.beans.BeanHandler; +import org.apache.magma.jobs.Job; +import org.apache.magma.jobs.Report; + +public class JobExecutingThread extends Thread { + + private Job job = null; + private Report report = new Report(); + private JobData data = null; + private SimpleScheduler scheduler = null; + + public JobExecutingThread(Job job, SimpleScheduler scheduler) { + this.job = job; + this.setName("Job " + job.toString()); + this.setDaemon(true); + this.scheduler = scheduler; + } + + @Override + public synchronized void start() { + report.started(); + super.start(); + } + + public void run() { + if (job.beanData().getProperty("report") != null) { + BeanHandler handler = job.handler(); + handler.setValue("report", report); + handler.commit(); + } + try { + job.run(); + } catch (Throwable t) { + report.error(t); + } finally { + report.finished(); + synchronized (scheduler) { + scheduler.notifyAll(); + } + } + } + + public Report getReport() { + return report; + } + + public JobData getData() { + return data; + } + + public void setData(JobData data) { + this.data = data; + if (data != null) data.setLastReport(this.report); + } + + +} Added: labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/SimpleScheduler.java URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/SimpleScheduler.java?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/SimpleScheduler.java (added) +++ labs/magma/trunk/jobs-simple/src/main/java/org/apache/magma/jobs/simple/SimpleScheduler.java Thu Nov 27 08:32:00 2008 @@ -0,0 +1,191 @@ +package org.apache.magma.jobs.simple; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.magma.basics.MagmaException; +import org.apache.magma.jobs.Job; +import org.apache.magma.jobs.JobStatus; +import org.apache.magma.jobs.Scheduler; +import org.apache.magma.jobs.SimpleTrigger; +import org.apache.magma.jobs.Trigger; + +public class SimpleScheduler extends Scheduler implements Runnable { + + private List<JobData> datas = new ArrayList<JobData>(); + private List<JobExecutingThread> running = new ArrayList<JobExecutingThread>(); + private List<JobStatus> finisheds = new ArrayList<JobStatus>(); + + private int maxThreads = 4; + + + @Override + public JobStatus run(String name, Job job) { + return internalRun(name, job); + } + + protected void internalRun(JobData data) { + JobExecutingThread jt = new JobExecutingThread(data.createInstance(), this); + setupThread(data, jt); + } + + private void setupThread(JobData data, JobExecutingThread jt) { + jt.setData(data); + data.setLastExecuted(System.currentTimeMillis()); + data.setRunning(true); + synchronized (this.running) { + jt.start(); + this.running.add(jt); + } + synchronized (this) { + this.notifyAll(); + } + } + + protected JobData internalRun(String name, Job job) { + JobData jd = new JobData(job); + jd.setName(name); + JobExecutingThread jt = new JobExecutingThread(job, this); + setupThread(jd, jt); + return jd; + } + + @Override + public void schedule(String name, Job job, Trigger trigger) { + if (!(trigger instanceof SimpleTrigger)) throw new MagmaException("The simple scheduler is only able to handle SimpleTrigger"); + SimpleTrigger strig = (SimpleTrigger) trigger; + JobData jd = new JobData(job); + jd.setName(name); + jd.setTrigger(strig); + synchronized (this.datas) { + datas.add(jd); + } + synchronized (this) { + this.notifyAll(); + } + } + + public long check() { + synchronized (this.running) { + for (Iterator<JobExecutingThread> iterator = this.running.iterator(); iterator.hasNext();) { + JobExecutingThread thread = iterator.next(); + if (!thread.getReport().isRunning()) { + System.out.println("Finished " + thread); + iterator.remove(); + if (!datas.contains(thread.getData())) { + if (finisheds.size() > 10) finisheds.remove(0); + finisheds.add(thread.getData()); + } + thread.getData().setRunning(false); + synchronized (this) { + this.notifyAll(); + } + } + } + } + long cur = System.currentTimeMillis(); + long min = cur + 60000; + synchronized (datas) { + for (JobData data : this.datas) { + if (data.getNextExecutionTime() <= cur) { + if (this.running.size() > maxThreads) { + min = cur + 1000; + } else { + internalRun(data); + } + } + if (min > data.getNextExecutionTime()) min = data.getNextExecutionTime(); + } + } + return min - cur; + } + + public void run() { + while (true) { + long wait = 2000; + try { + wait = check(); + } catch (Throwable t) { + t.printStackTrace(); + } + try { + if (wait <= 0) wait = 1; + synchronized (this) { + System.out.println("Waiting " + wait); + this.wait(wait); + System.out.println("Awake"); + } + } catch (InterruptedException e) { + } + } + } + + @Override + public List<JobStatus> getRunning() { + List<JobStatus> statuses = new ArrayList<JobStatus>(); + synchronized (running) { + for (JobExecutingThread th : this.running) { + if (!datas.contains(th.getData())) { + statuses.add(th.getData()); + } + } + } + return statuses; + } + + @Override + public List<JobStatus> getScheduled() { + List<JobStatus> statuses = new ArrayList<JobStatus>(); + synchronized (datas) { + for (JobStatus jobStatus : this.datas) { + statuses.add(jobStatus); + } + } + return statuses; + } + + @Override + public List<JobStatus> getFinished() { + List<JobStatus> statuses = new ArrayList<JobStatus>(); + synchronized (running) { + for (JobStatus status : this.finisheds) { + statuses.add(status); + } + } + + return statuses; + } + + @Override + public JobStatus getStatus(int id) { + synchronized (datas) { + for (JobStatus jobStatus : this.datas) { + if (jobStatus.getId() == id) return jobStatus; + } + } + synchronized (running) { + for (JobExecutingThread th : this.running) { + if (!datas.contains(th.getData())) { + if(th.getData().getId() == id) return th.getData(); + } + } + for (JobStatus jobStatus : this.finisheds) { + if (jobStatus.getId() == id) return jobStatus; + } + } + return null; + } + + void reset() { + synchronized (this.datas) { + this.datas.clear(); + } + synchronized (this.running) { + this.finisheds.clear(); + } + synchronized (this) { + this.notifyAll(); + } + } +} Added: labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulerRunTest.java URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulerRunTest.java?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulerRunTest.java (added) +++ labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulerRunTest.java Thu Nov 27 08:32:00 2008 @@ -0,0 +1,55 @@ +package org.apache.magma.jobs.simple; + +import java.util.List; + +import org.apache.magma.jobs.JobStatus; +import org.apache.magma.jobs.Scheduler; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.junit.matchers.JUnitMatchers.*; +import static org.hamcrest.CoreMatchers.*; + + +public class SimpleSchedulerRunTest { + + @Test + public void run() throws Exception { + StupidJob.messages.clear(); + Scheduler sched = new Scheduler(); + StupidJob job = new StupidJob(); + job.setMessage("simplerun"); + sched.run("test", job); + synchronized (sched) { + sched.wait(60000); + } + assertThat(StupidJob.messages, hasItem(equalTo("simplerun"))); + } + + @Test + public void multiRun() throws Exception { + StupidJob.messages.clear(); + Scheduler sched = new Scheduler(); + + for (int i = 1; i < 10; i++) { + StupidJob job = new StupidJob(); + job.setMessage("simplerun" + i); + sched.run("test", job); + assertThat("Not found a job in the state for number " + i, sched.getRunning().size(), not(0)); + } + + int cnt = 0; + while (sched.getRunning().size() > 0 && cnt < 20) { + synchronized (sched) { + sched.wait(1500); + } + cnt++; + } + assertThat(cnt, not(20)); + + for (int i = 1; i < 10; i++) { + assertThat(StupidJob.messages, hasItem(equalTo("simplerun" + i))); + } + } + +} Added: labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulingTest.java URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulingTest.java?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulingTest.java (added) +++ labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/SimpleSchedulingTest.java Thu Nov 27 08:32:00 2008 @@ -0,0 +1,49 @@ +package org.apache.magma.jobs.simple; + +import org.apache.magma.jobs.Every; +import org.apache.magma.jobs.FineEvery; +import org.apache.magma.jobs.JobStatus; +import org.apache.magma.jobs.Scheduler; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.junit.matchers.JUnitMatchers.*; +import static org.hamcrest.CoreMatchers.*; + +public class SimpleSchedulingTest { + + + @Test + public void scheduleRepeating() throws Exception { + StupidJob.messages.clear(); + Scheduler sched = new Scheduler(); + ((SimpleScheduler)sched).reset(); + + StupidJob job = new StupidJob(); + job.setMessage("recurring"); + sched.schedule("test", job, new FineEvery(0,0,3)); + + assertThat(sched.getScheduled(), hasItem(not(nullValue(JobStatus.class)))); + + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() < start + 14000) { + synchronized (sched) { + sched.wait(3000); + } + } + + ((SimpleScheduler)sched).reset(); + assertThat(StupidJob.messages.size(), equalTo(5)); + assertThat(StupidJob.messages, everyItem(equalTo("recurring"))); + + int cnt = 0; + while (cnt < 10 && (sched.getScheduled().size() > 0 || sched.getRunning().size() > 0)) { + synchronized (sched) { + sched.wait(1000); + } + cnt++; + } + assertThat("Scheduler didn't remove scheduled task, even if i waited", sched.getScheduled().size(), equalTo(0)); + assertThat("Scheduler didn't remove running task, even if i waited", sched.getRunning().size(), equalTo(0)); + } +} Added: labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/StupidJob.java URL: http://svn.apache.org/viewvc/labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/StupidJob.java?rev=721230&view=auto ============================================================================== --- labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/StupidJob.java (added) +++ labs/magma/trunk/jobs-simple/src/test/java/org/apache/magma/jobs/simple/StupidJob.java Thu Nov 27 08:32:00 2008 @@ -0,0 +1,45 @@ +package org.apache.magma.jobs.simple; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.magma.beans.BeanData; +import org.apache.magma.beans.BeanHandler; +import org.apache.magma.jobs.Job; +import org.apache.magma.jobs.Report; + +public class StupidJob implements Job { + + public static List<String> messages = new ArrayList<String>(); + + private String message = null; + private Report report = null; + + + public void run() { + System.out.println(Thread.currentThread().getName() + " - " + new Date() + " - WAIT"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + messages.add(message); + System.out.println(Thread.currentThread().getName() + " - " + new Date() + "MESSAGED " + message); + report.setState("messaging " + message); + } + + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + + public void setReport(Report report) { + this.report = report; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]