Kunal,

If you have much more files than CPU cores then use asynchronous queue
of single-thread loaders (see LUBM loading procedure in the attachment
as an example). If you have small number of files then load file after
file, in turn, using multi-thread loader DB.DBA.RDF_LOAD_RDFXML_MT().

Best Regards,
Ivan Mikhailov,
OpenLink Software.

On Fri, 2008-02-08 at 11:56 -0800, Kunal Patel wrote:
> Hi,
> 
>   I am planning to load the uniprot KB (in RDF) into Virtuoso.  Can I
> know what would be the best way to do that?
> 
> Kunal

--  This file is part of the OpenLink Software Virtuoso Open-Source (VOS)
--  project.
--
--  Copyright (C) 1998-2006 OpenLink Software
--
--  This project is free software; you can redistribute it and/or modify it
--  under the terms of the GNU General Public License as published by the
--  Free Software Foundation; only version 2 of the License, dated June 1991.
--
--  This program is distributed in the hope that it will be useful, but
--  WITHOUT ANY WARRANTY; without even the implied warranty of
--  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
--  General Public License for more details.
--
--  You should have received a copy of the GNU General Public License along
--  with this program; if not, write to the Free Software Foundation, Inc.,
--  51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA


-- Set the checkpoint interval to 100 hours -- enough to complete most of experiments.
checkpoint_interval (6000);

-- Uncomment to get records of load levels (for maintainers only).
-- load ldmeter.sql;

-- This table is used solely to collect statistics about loading progress, esp. to understand scalability issues.
create table LUBM_LOAD_LOG (
  THREAD integer not null,	-- Thread ID, -1 if threads are automatically dispatched by async queue.
  TIDX integer not null,	-- Task index, e.g. a serial number of file in sequence of created tasks.
  START datetime not null,	-- Time when the loading is started.
  FINISH datetime not null,	-- Time when the loading is finished.
  FILE varchar not null,	-- Source file name.
  STATE varchar not null,	-- Final error state.
  MSG varchar not null );	-- Final error message.

-- One loading -- one set of statistics.
delete from LUBM_LOAD_LOG;

-- There are two methods of loading, for different experiments:
--
-- LUBM_LOAD_MT_1() uses a multithreaded parser in order to load its set of files one after the other.
--   N multithreaded parsers, M loading threads each, can efficiently load about (2 + (N * (1+M))) CPU cores.
--
-- LUMB_LOAD_LOG2() creates an asynchronous queue of single-thread parsers.
--   N single-thread parsers can load about (2 + N) CPU cores.
--
-- Use LUMB_LOAD_LOG2() if you don't want to experiment and simply want to load data.
--
-- Function calls given below are OK for case of 8 CPU cores (say, 2 quad-core Xeons),
-- source files reside in subdirectories 'data-0', 'data-1' and 'data-2' of working directory
-- that reside on different hadr disks (so they're actually symlinks).
-- You may change number of threads and/or directories as appropriate for your number of CPU cores and HDDs.

-- First method: multithreaded parser.

create procedure LUBM_LOAD_MT_1 (in thread_no integer, in path varchar,
  in fctr_first integer := 0, in fctr_step integer := 1,
  in parsing_threads integer := 1 )
{
  declare dirlist any; -- list of files on source directory
  declare ctr, len integer; -- file counter and count
  dirlist := sys_dirlist (path, 1);
  len := length (dirlist);
  for (ctr := fctr_first; ctr < len; ctr := ctr + fctr_step)
    {
      declare sta datetime;
      rollback work;
      sta := now ();
      -- In case of error log it and continue with next file:
      declare continue handler for sqlstate '*' {
        rollback work;
        insert into LUBM_LOAD_LOG (THREAD, TIDX, START, FINISH, FILE, STATE, MSG)
        values (thread_no, ctr, sta, now(), dirlist[ctr], __SQL_STATE, __SQL_MESSAGE);
        commit work;
        };
      -- Start the multithreaded parser
      DB.DBA.RDF_LOAD_RDFXML_MT (file_to_string_output (path || '/' || dirlist[ctr]),
        'lubm', 'lubm', 0, parsing_threads );
      commit work;
      -- Log status
      insert into LUBM_LOAD_LOG (THREAD, TIDX, START, FINISH, FILE, STATE, MSG)
      values (thread_no, ctr, sta, now(), dirlist[ctr], '00000', '');
      commit work;
    }
}
;

-- That's how to launch a group of multithreaded parsers in parallel:

--checkpoint;
---- Note that procedures are started background, '&' instead of ';', except the last.
----                    thread IDs to distinguish threads in statistical analysis
----                   /        path to directory with files to load
----                  /        /   load only odd- or only even- numbered files
----                 /        /   /  fctr_step to stripe on odd/even
----                /        /   /  /  number of data loading threads
----               /        /   /  /  /
--LUBM_LOAD_MT_1 (0, 'data-0', 0, 2, 1) &
--LUBM_LOAD_MT_1 (1, 'data-1', 0, 2, 1) &
--LUBM_LOAD_MT_1 (2, 'data-2', 0, 2, 1) &
--LUBM_LOAD_MT_1 (3, 'data-0', 1, 2, 1) &
--LUBM_LOAD_MT_1 (4, 'data-1', 1, 2, 1) &
--LUBM_LOAD_MT_1 (5, 'data-2', 1, 2, 1) ; -- last procedure is started foreground
--WAIT_FOR_CHILDREN;
--checkpoint;
--checkpoint_interval (60);



-- Second method: queue of single-thread parsers

-- This procedure simply loads one file by one thread.
-- The procedure could be as short and simple as single call of DB.DBA.RDF_LOAD_RDFXML()
-- but we cheat with logging and free-text indexing to get identical behaviour of servers of different versions.
create procedure DB.DBA.RDF_LOAD_LUBM_RDFXML (in filename varchar, in ctr integer)
{
  declare sta datetime; -- start time
  declare ro_id_dict, app_env any;
  -- If data should be free-text indexed then we create a dictionary for 'graph keywords'
  if (__rdf_obj_ft_rule_count_in_graph (iri_to_id ('lubm')))
    ro_id_dict := dict_new (5000);
  else
    ro_id_dict := null;
  -- We create environment for callbacks:
  app_env := vector (null, ro_id_dict);
  rollback work;
  sta := now ();
  -- Prepare error handler to keep records of loading errrors:
  declare exit handler for sqlstate '*' {
    rollback work;
    log_enable (1, 1);
    insert into LUBM_LOAD_LOG (THREAD, TIDX, START, FINISH, FILE, STATE, MSG)
    values (-1, ctr, sta, now(), filename, __SQL_STATE, __SQL_MESSAGE);
    dbg_obj_princ (now(), ctr, filename, __SQL_STATE, __SQL_MESSAGE);
    commit work;
    return;
    };
  -- Enable auto-commit without transaction log
  log_enable (2, 1);
  -- Finally, we start the parser with callbacks that will place queds to database:
  rdf_load_rdfxml (file_to_string (filename), 0,
    'lubm',
    vector (
      'DB.DBA.TTLP_EV_NEW_GRAPH',
      'DB.DBA.TTLP_EV_NEW_BLANK',
      '!iri_to_id',
      'DB.DBA.TTLP_EV_TRIPLE',
      'DB.DBA.TTLP_EV_TRIPLE_L',
      '' ),
    app_env,
    'lubm' );
  -- Revert transactional behavior to "normal" and record statistics/state.
  log_enable (1, 1);
  insert into LUBM_LOAD_LOG (THREAD, TIDX, START, FINISH, FILE, STATE, MSG)
  values (-1, ctr, sta, now(), filename, '00000', '');
  if (0 = mod (ctr, 1000))
    dbg_obj_princ (now (), ctr, filename);
}
;

-- This procedure forms an asynchronous queue of parsers.
create procedure LUBM_LOAD_LOG2 (
  in dirnames any, -- Vector of names of directories with source files
  in thread_count integer, -- Maximum allowed number of parsers running at same time
  in decimation_ratio integer := 1) -- 1 to load all, 10 to load every second or tenth file etc.
{
  declare aq, -- queue of tasks; one file -> one parser call -> one task
    dirlists, -- vercor of vectors of filenames in source directories
    fctrs, -- vector, one file counter per directory
    dirlist_lens any; -- vector, one file count per directory
  declare serialctr, dirty, dirctr, dircount integer;
  dirlists := dirnames;
  dirlist_lens := dirnames;
  fctrs := dirnames;
  dircount := length (dirnames);
  -- Load all filenames in all directories.
  for (dirctr := 0; dirctr < dircount; dirctr := dirctr + 1)
    {
      dirlists[dirctr] := sys_dirlist (dirnames[dirctr], 1);
      dirlist_lens[dirctr] := length (dirlists[dirctr]);
      fctrs[dirctr] := 0;
    }
  -- Create queue of tasks with required "width".
  aq := async_queue (thread_count);
  serialctr := 0;
  -- In a loop for all directories and all files, form a queue of parsing tasks:
  while (1)
    {
      dirty := 0;
      for (dirctr := 0; dirctr < dircount; dirctr := dirctr + 1)
        {
          if (fctrs[dirctr] < dirlist_lens[dirctr])
            {
              aq_request (aq, 'DB.DBA.RDF_LOAD_LUBM_RDFXML',
                vector (concat (dirnames[dirctr], '/', dirlists[dirctr][fctrs[dirctr]]), serialctr) );
              fctrs[dirctr] := fctrs[dirctr] + decimation_ratio;
              serialctr := serialctr + 1;
              dirty := 1;
            }
        }
      if (not dirty)
        goto done;
    }
done:
  -- When there's no more files to queue for parsing, stay here and wait for completion of last task.
  aq_wait_all (aq);
}
;

-- That's how to launch a queue of single-thread parsers:
checkpoint;
-- Uncomment to get records of load levels (for maintainers only).
ld_meter_run (600) &
LUBM_LOAD_LOG2 (vector ('data-0', 'data-1', 'data-2'), 7); -- Three directories and 6 parsers
checkpoint;
checkpoint_interval (60);
shutdown;

Reply via email to