On Sun, 17 May 2009 21:34:58 -0700 (PDT), Allen Fowler <allen.fow...@yahoo.com> wrote:
> >Hello, > >I have several CGI and cron scripts and that I would like coordinate via a >"First In >/ First Out" style buffer. That is, some processes are adding work >units, and some take the oldest and start work on them. > >Could SQLite be used for this? > >It would seem very complex to use SQL for just a FIFO, but then again, SQLite >would take acre of all ACID / concurrency issues. > >Has this been done before? > >Thanks, >:) For what it's worth, here you go. Perhaps you can borrow a few ideas from it. ================= mkschema.sql ===================== -- -- schema for database job.db3 -- PRAGMA page_size=8192; PRAGMA default_cache_size=512; CREATE TABLE statustext ( status CHAR(1) PRIMARY KEY DEFAULT NULL CONSTRAINT sttxt_valid_status CHECK (status IN ('W','I','R','T','A','C')), sttext VARCHAR(16) ); INSERT INTO statustext (status,sttext) VALUES ('W','Wait'); INSERT INTO statustext (status,sttext) VALUES ('I','Initializing'); INSERT INTO statustext (status,sttext) VALUES ('R','Running'); INSERT INTO statustext (status,sttext) VALUES ('T','Terminated'); INSERT INTO statustext (status,sttext) VALUES ('A','Abend'); INSERT INTO statustext (status,sttext) VALUES ('C','Cancelled'); -- cancelled before dispatched CREATE TABLE jobs ( jobid INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, TSN CHAR(4), jobprio INTEGER DEFAULT 9 CONSTRAINT jobs_valid_prio CHECK (jobprio > 0 AND jobprio < 10), status CHAR(1) DEFAULT 'W' CONSTRAINT jobs_valid_status CHECK (status IN ('W','I','R','T','A','C')), userid VARCHAR(8) NOT NULL, dtcreate DATETIME DEFAULT CURRENT_TIMESTAMP, dtinit DATETIME DEFAULT NULL, dtstart DATETIME DEFAULT NULL, dtstop DATETIME DEFAULT NULL, dtmodify DATETIME DEFAULT CURRENT_TIMESTAMP, dtdnload DATETIME DEFAULT NULL, cmnd VARCHAR(254), -- cmnd\*.cmd to execute pars VARCHAR(254), -- parameters for procedure (host,userid,filename[,type,elementname]) rc INTEGER, -- ERRORLEVEL endmsg VARCHAR(254), -- message from dispatcher sysout VARCHAR(254), -- logfile dnload VARCHAR(254) -- file to download ); CREATE INDEX idx_jobs_tsn ON jobs(TSN); CREATE TABLE pars ( parid INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, jobid INTEGER CONSTRAINT fk_pars_jobs REFERENCES jobs (jobid) ON DELETE CASCADE, partx TEXT ); CREATE TRIGGER jobs_ins AFTER INSERT ON jobs FOR EACH ROW BEGIN UPDATE jobs SET TSN = substr(10000000 + NEW.jobid,5,4) WHERE jobid = NEW.jobid; DELETE FROM jobs WHERE jobid < (NEW.jobid - 9999) AND TSN <= NEW.TSN; END; CREATE TRIGGER jobs_upd AFTER UPDATE ON jobs FOR EACH ROW BEGIN UPDATE jobs SET rc = CASE WHEN OLD.status == 'W' AND NEW.status == 'I' THEN NULL ELSE NEW.rc END, endmsg = CASE WHEN OLD.status == 'W' AND NEW.status == 'I' THEN NULL ELSE NEW.endmsg END, dtinit = CASE WHEN OLD.status == 'W' AND NEW.status == 'I' THEN CURRENT_TIMESTAMP ELSE OLD.dtinit END, dtstart = CASE WHEN OLD.status == 'I' AND NEW.status == 'R' THEN CURRENT_TIMESTAMP ELSE OLD.dtstart END, dtstop = CASE WHEN OLD.status == 'R' AND (NEW.status == 'A' OR NEW.status == 'T') THEN CURRENT_TIMESTAMP ELSE OLD.dtstop END, dtmodify = CASE WHEN NEW.status = OLD.status THEN dtmodify ELSE CURRENT_TIMESTAMP END WHERE jobid = NEW.jobid; END; -- on delete cascade CREATE TRIGGER jobs_del AFTER DELETE ON jobs FOR EACH ROW BEGIN DELETE FROM pars WHERE pars.jobid = OLD.jobid; END; -- references CREATE TRIGGER fki_pars_jobs BEFORE INSERT ON pars FOR EACH ROW BEGIN SELECT RAISE(ROLLBACK, 'insert on table "pars" violates foreign key constraint "fk_pars_jobs(i)"') WHERE NEW.jobid IS NOT NULL AND (SELECT jobid FROM jobs WHERE jobid = NEW.jobid) IS NULL; END; -- references CREATE TRIGGER fku_pars_jobs BEFORE UPDATE ON pars FOR EACH ROW BEGIN SELECT RAISE(ROLLBACK, 'update on table "pars" violates foreign key constraint "fk_pars_jobs(u)"') WHERE NEW.jobid IS NOT NULL AND (SELECT jobid FROM jobs WHERE jobid = NEW.jobid) IS NULL; END; CREATE VIEW shjobsta AS -- for use in .cmd scripts SELECT TSN ,status ,userid ,datetime(dtcreate,'localtime') AS spoolin ,datetime(dtinit ,'localtime') AS dispatch ,datetime(dtstart ,'localtime') AS logon ,datetime(dtstop ,'localtime') AS logoff ,cmnd ,pars ,rc ,endmsg FROM jobs ORDER BY jobid DESC; CREATE VIEW show_job_status AS -- information for users SELECT '<a href="shtsn.php?TSN='|| TSN ||'">'|| TSN ||'</a>' AS TSN ,sttext AS status ,userid ,datetime(dtcreate,'localtime') AS spoolin , time(dtstart ,'localtime') AS logon , time(dtstop ,'localtime') AS logoff ,cmnd ,rc ,endmsg ,datetime(dtdnload,'localtime') AS downloaded FROM jobs INNER JOIN statustext USING (status) ORDER BY jobid DESC LIMIT 50; CREATE VIEW show_tsn AS -- information for users, with download links after termination SELECT TSN ,sttext AS status ,userid ,datetime(dtcreate,'localtime') AS spoolin ,datetime(dtinit ,'localtime') AS dispatch ,datetime(dtstart ,'localtime') AS logon ,datetime(dtstop ,'localtime') AS logoff ,datetime(dtdnload,'localtime') AS download ,cmnd ,'<a href="shpar.php?TSN='|| TSN ||'">'|| pars ||'</a>' as parameters ,rc ,endmsg ,CASE WHEN status = 'A' OR status = 'T' THEN '<a href="shsysout.php?TSN='|| TSN ||'">'|| sysout ||'</a>' ELSE '' END AS sysout ,CASE WHEN status = 'T' THEN '<a href="getresult/'|| TSN || '.zip">' || dnload || '</a>' ELSE '' END AS download_file FROM jobs INNER JOIN statustext USING (status) ORDER BY jobid DESC LIMIT 50; CREATE VIEW show_par AS -- information for users SELECT TSN,userid,partx AS parameterwaarde FROM jobs INNER JOIN pars USING (jobid) WHERE jobid > (SELECT (SELECT MAX(jobid) FROM jobs) - 50) ORDER BY jobid DESC, parid; CREATE VIEW get_result AS -- download SELECT TSN ,userid ,CASE WHEN status = 'T' THEN dnload ELSE '' END AS dnload FROM jobs ORDER BY jobid DESC; CREATE VIEW get_sysout AS SELECT TSN ,userid ,CASE WHEN status = 'T' THEN sysout ELSE '' END AS sysout FROM jobs ORDER BY jobid DESC; CREATE VIEW get_init_prop AS SELECT TSN ,status ,userid ,cmnd ,pars FROM jobs WHERE status == 'I' ORDER BY jobprio,TSN LIMIT 1; PRAGMA user_version=2; DROP VIEW IF EXISTS dispatch_job; CREATE VIEW dispatch_job AS SELECT TSN ,status ,userid ,cmnd ,pars FROM jobs WHERE status == 'W' ORDER BY jobprio,TSN LIMIT 1; PRAGMA user_version=2; DROP TRIGGER IF EXISTS upd_init_job; DROP TRIGGER IF EXISTS upd_dispatch_job; CREATE TRIGGER upd_dispatch_job INSTEAD OF UPDATE ON dispatch_job WHEN NEW.status = 'I' BEGIN UPDATE jobs SET status = 'I' WHERE TSN = NEW.TSN; END; -- procedure to dispatch a job: -- BEGIN; -- UPDATE dispatch_job set STATUS = 'I'; -- SELECT * FROM get_init_prop; -- COMMIT; -- -- calculate refresh interval of status pages -- SELECT CURRENT_TIMESTAMP,MAX(dtmodify), -- strftime('%s',CURRENT_TIMESTAMP), -- strftime('%s',MAX(dtmodify)), -- strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify)) AS ago -- FROM jobs; CREATE VIEW refreshinterval_admin AS SELECT CASE WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 60 THEN 15 -- less than a minute : every 15 seconds WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 180 THEN 30 -- less than 3 minutes : every 30 seconds WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 360 THEN 60 -- less than 6 minutes : every minute WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 1800 THEN 600 -- less than 30 minutes : every 10 mins WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 3600 THEN 1200 -- less than an hour : every 20 mins WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 86400 THEN 3600 -- less than an day : every hour ELSE 7200 END AS refresh -- else every two hours FROM jobs; -- DROP VIEW IF EXISTS refreshinterval_user; CREATE VIEW refreshinterval_user AS SELECT userid,CASE WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 60 THEN 15 -- less than a minute : every 15 seconds WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 180 THEN 30 -- less than 3 minutes : every 30 seconds WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 360 THEN 60 -- less than 6 minutes : every minute WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 1800 THEN 600 -- less than 30 minutes : every 10 mins WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 3600 THEN 1200 -- less than an hour : every 20 mins WHEN (strftime('%s',CURRENT_TIMESTAMP) - strftime('%s',MAX(dtmodify))) < 86400 THEN 3600 -- less than an day : every hour ELSE 7200 END AS refresh -- else every two hours, for users per user FROM jobs GROUP BY userid; PRAGMA user_version=4; ============ dispatch.cmd ============== @echo off :: Job dispatcher goto MAIN :: -------------------------------------- :: level 3 subroutines called by level 2 :: -------------------------------------- :: -- :: Show job status :: -- :: Note: use quoted string for selects/updates WHERE TSN== :SHJOBSTA set SQL=SELECT * FROM shjobsta WHERE TSN=='%JOB_TSN%'; %GNU_ECHO% "%SQL_PFX%.mode line\n%SQL%" | %SQLITE% %PAR_SDB% if errorlevel 1 goto SQLERR goto Z :: -------------------------------------- :: level 2 subroutines called by DISPATCH :: -------------------------------------- :: -- :: set date and time for display and log :: -- :GETDT set MYDATE=%DATE% set MYTIME=%TIME:~0,8% set MYTIME=%MYTIME: =0% goto Z :: -- :: retrieve a command and its params from the scheduling database :: -- :GETJOB echo getjob :: unset job properties for %%v in (TSN status dtcreate dtstart dtstop dtmodify userid cmnd pars parlst rc endmsg sysout dnload) do set JOB_%%v= :: create a stream to set new job properties (set JOB_keyword=value) set SQL=BEGIN;\nUPDATE dispatch_job set STATUS = 'I';\nSELECT * FROM get_init_prop;\nCOMMIT; %GNU_ECHO% "%SQL_PFX%.mode line\n%SQL%" | %SQLITE% %PAR_SDB% | %GNU_AWK% "{gsub(/^[[:blank:]]+/,\"\");gsub(/[[:blank:]]=[[:blank:]]/,\"=\");print \"set JOB_\" $0}" >%TMPDIR%\SSN#%PAR_SSN%.bat if errorlevel 1 goto SQLERR call %TMPDIR%\SSN#%PAR_SSN%.bat del %TMPDIR%\SSN#%PAR_SSN%.bat if "%JOB_TSN%"=="" goto QEMPTY if "%JOB_TSN%"=="%PRV_TSN%" goto DUPTSN set PRV_TSN=%JOB_TSN% set JOB_rc=0 :: Retrieve parameter list from database set JOB_parlst=%TMPDIR%/%JOB_TSN%.par set JOB_dnload=%TMPDIR%/%JOB_TSN%.zip if exist "%TMPDIR%\%JOB_TSN%.zip" del "%TMPDIR%\%JOB_TSN%.zip" set SQL=SELECT partx FROM pars INNER JOIN jobs USING (jobid) WHERE TSN=='%JOB_TSN%'; %GNU_ECHO% "%SQL_PFX%.mode list\n%SQL%" | %SQLITE% %PAR_SDB% >%JOB_parlst% if errorlevel 1 goto SQLERR goto Z :DUPTSN echo Duplicate TSN %JOB_TSN%, stop dispatching set JOB_TSN= :QEMPTY echo No more waiting jobs, last job was %PRV_TSN%. goto Z :: -- :: execute a job :: -- :EXEJOB @echo off echo exejob %JOB_TSN% set SQL=UPDATE jobs SET status = 'R' WHERE TSN=='%JOB_TSN%'; %GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB% if errorlevel 1 goto SQLERR set JOB_endmsg=Undefined cmnd if not exist cmnd\%JOB_cmnd%.cmd goto :CMDNF call :SHJOBSTA >"%JOB_sysout%" 2>&1 set JOB_endmsg=Error in `%JOB_cmnd%.cmd` @echo on @call cmnd\%JOB_cmnd%.cmd %JOB_pars% >>"%JOB_sysout%" 2>&1 set JOB_rc=%ERRORLEVEL% if "%JOB_rc%"=="" set JOB_rc=0 @echo off if "%JOB_rc%"=="0" goto NORMEND if "%JOB_rc%"=="1" goto WARNING if "%JOB_rc%"=="2" goto WARNING if "%JOB_rc%"=="3" goto WARNING if "%JOB_rc%"=="4" goto WARNING if "%JOB_rc%"=="5" goto WARNING if "%JOB_rc%"=="6" goto WARNING if "%JOB_rc%"=="7" goto WARNING if "%JOB_rc%"=="8" goto WARNING if "%JOB_rc%"=="9" goto WARNING goto ABEND :: No error or just a warning (1) :WARNING set JOB_endmsg=%JOB_endmsg% (with warnings) :NORMEND if "%JOB_endmsg%"=="" set JOB_endmsg=Normal end echo %JOB_endmsg% if NOT exist %JOB_dnload% set JOB_dnload= set SQL=UPDATE jobs SET status='T', rc='%JOB_rc%', endmsg='%JOB_endmsg%', sysout = CASE WHEN '%JOB_sysout%'=='' THEN NULL ELSE '%JOB_sysout:log/=%' END, dnload= CASE WHEN '%JOB_dnload%'=='' THEN NULL ELSE '%JOB_dnload:tmp/=%' END WHERE TSN=='%JOB_TSN%' AND status=='R'; %GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB% if errorlevel 1 goto SQLERR call :SHJOBSTA >>"%JOB_sysout%" 2>&1 if exist "%TMPDIR%\%JOB_TSN%.par" del "%TMPDIR%\%JOB_TSN%.par" goto Z :CMDNF @echo off if "%JOB_endmsg%"=="" set JOB_endmsg=No procedure found to handle command '%JOB_cmnd%' :: set errorlevel without having to use exit /b %GNU_AWK% "BEGIN{exit 253}" :ABEND @echo off if "%JOB_endmsg%"=="" set JOB_endmsg=No reason specified echo %MYDATE% %MYTIME% %JOB_TSN% Abend # %JOB_rc% `%JOB_endmsg%` SQL=UPDATE jobs SET status = 'A', rc = '%JOB_rc%', endmsg = '%JOB_endmsg%', sysout = CASE WHEN '%JOB_sysout%'=='' THEN NULL ELSE '%JOB_sysout:log/=%' END WHERE TSN=='%JOB_TSN%'; %GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB% if errorlevel 1 goto SQLERR call :SHJOBSTA :: The user proc may have failed, but the dispatcher is healthy exit /B 0 :SQLERR echo Error during SQL processing, can't continue echo Offending statement: echo %SQL% exit /B 3 :: ====================================== :: level 1 subroutines called by MAIN :: ====================================== :: -- :: Dispatch a job (fetch from queue, execute) :: -- :DISPATCH :DISPNEXT call :GETJOB if errorlevel 1 goto R call :GETDT if "%JOB_TSN%"=="" goto DISPDONE set JOB_sysout=log/%JOB_TSN%.txt PROMPT $$$S call :EXEJOB if errorlevel 1 goto R PROMPT $P$G call :GETDT echo %MYDATE% %MYTIME% 8 %0 %PAR_SSN% %JOB_TSN% %JOB_rc% %JOB_endmsg% echo %MYDATE% %MYTIME% 8 %0 %PAR_SSN% %JOB_TSN% %JOB_rc% %JOB_endmsg%>>log\log.txt goto DISPNEXT :DISPDONE call :GETDT echo %MYDATE% %MYTIME% 9 %0 %PAR_SSN% echo %MYDATE% %MYTIME% 9 %0 %PAR_SSN% >>log\log.txt goto Z :: :: Reset a TSN from status A to W :RESET set SQL=UPDATE jobs SET status = 'W' WHERE TSN=='%1' AND status IN ('A','I','T','R'); %GNU_ECHO% "%SQL_PFX%%SQL%" | %SQLITE% %PAR_SDB% if errorlevel 1 goto SQLERR set PAR_RESTART= goto Z :: ==================================== :: level 0 MAIN entrypoint :: dispatch [RESTART=TSN#] [SSN=SSN#] [SDB=jobdbpathfile] :: :: ==================================== :MAIN call \data\opt\cfg\setenv.cmd call \data\opt\cfg\setdir.cmd set SQL_PFX=.echo off\n.bail on\n.timeout 1000\n cd /D %0\.. call \data\opt\cfg\%cfg%\setdrives.cmd >log\drives.txt if errorlevel 1 goto P01 :: reset all possible parameters for %%p in (RESTART SDB SSN) do set PAR_%%p= :: set defaults :: - schedule serial number (TSN is fetched from the job database) set PAR_SSN=@@@@ :: - job database set PAR_SDB=%DRIV6%\data\opt\db\li\job.db3 :GETPAR if "%1"=="" goto PROCESS if "%2"=="" goto P02 set PAR_%1=%2 shift shift goto GETPAR :PROCESS set PAR_ >log\SSN#%PAR_SSN%.txt set DRIV >>log\SSN#%PAR_SSN%.txt if DEFINED PAR_RESTART call :RESET %PAR_RESTART% >>log\SSN#%PAR_SSN%.txt set PRV_TSN=@@@@ echo %MYDATE% %MYTIME% 1 %0 %PAR_SSN% %PAR_SDB% >>log\log.txt echo %MYDATE% %MYTIME% 1 %0 %PAR_SSN% %PAR_SDB% >>log\SSN#%PAR_SSN%.txt :: dispatcher loop, one task at a time call :DISPATCH >>log\SSN#%PAR_SSN%.txt 2>&1 if "%PAR_SSN%"=="@@@@" goto Z @cls @exit /b 0 :: MAIN Environment and Parameter errors :P01 echo Can't get all required driveletters. goto R :P02 echo Parameters must be specified as pairs 'name value' or 'name=value' echo dispatch [RESTART=TSN#] [SSN=SSN#] [SDB=jobdbpathfile] goto R :: Dispatcher errors :R PROMPT $P$G exit /B 1 :Z @echo off ======= php fragment to create a job with or without parameters ==== ======= it's part of a class which extends PDO ==== ======= I prefer php_pdo_sqlite_external ==== function enter_job($userid,$cmnd,$parlist,$ntuid,$ntpsw,$jobprio = 8){ $msg = ''; /* Execute a prepared statement by passing an array of values --> */ $sql = 'INSERT INTO jobs (userid,cmnd,pars,jobprio) VALUES (:userid,:cmnd,:pars,:jobprio)'; $stjob = $this->prepare($sql); $this->beginTransaction(); $stjob->execute(array(':userid' => $userid, ':cmnd' => $cmnd, ':pars' => '@list', ':jobprio' => $jobprio)); /* * * This will accommodate up to 99 999 999 requests, * then we have to reset by deleting the database. * It will be rebuilt automatically * 01234567 oO0 */ $jobid = $this->lastInsertId(); $tsn = substr(10000000 + $jobid,4,4); if (isset($parlist)){ /* there are parameters */ $sql = 'INSERT INTO pars (jobid,partx) VALUES (:jobid,:partx)'; $stpar = $this->prepare($sql); if (is_array($parlist)){ /* we got a text array with params */ foreach ($parlist as $aval){ $stpar->execute(array(':jobid' => $jobid, ':partx' => $aval)); } } else if (is_object($parlist)) { /* we got a resultset from a query as paramlist */ while ($row = $parlist->fetch(PDO::FETCH_NUM)){ $stpar->execute(array(':jobid' => $jobid, ':partx' => $row[0])); } } } else { } $this->commit(); /* * ugly code to launch the dispatcher asynchronously using * Windows schtasks.exe is left to the imagination of the reader */ } -- ( Kees Nuyt ) c[_] _______________________________________________ sqlite-users mailing list sqlite-users@sqlite.org http://sqlite.org:8080/cgi-bin/mailman/listinfo/sqlite-users