Hi,
at Talis we use MapReduce jobs in our ingestion pipeline to clean, validate,
deduplicate RDF data and build all the indexes we need on our live servers.
The indexes we build include indexes for Lucene/Solr and for TDB.
With Lucene it's relatively easy to use MapReduce to build separate Lucene
segments and merge them together at the end. We do not have (yet) a
MapReduce job which we can use to build TDB indexes. (Is there a way to
efficiently merge two separate sets of TDB indexes? Maybe this can be
another direction to explore.)
In theory (see an old message on the old jena-dev: [1]) it is possible to
use MapReduce jobs to create a dictionary equivalent to a node table for
TDB. However, the step from theory to practice is not always as smooth as it
might seem. I have recently done a further step towards practice and I want
to share it. This is well far away from being 'production ready' and it
remains to be proved if it is at all worthwhile, any faster or cheaper than
tdbloader2. Tdbloader2 is great and if you have enough RAM on your machine
probably a sequence of MapReduce jobs will not do better.

In any case, how can we build B+Tree indexes (for TDB) using MapReduce jobs?

A sequence of three MapReduce jobs:
-----------------------------------

Tdbloader3 is a sequence of three MapReduce jobs. The first MapReduce job
builds the node table and produces the input for the next job. The second
MapReduce job outputs the input data using node ids rather than URIs, blank
nodes or literals. The last MapReduce job reuses the same BPlusTreeRewriter
which is used in tdbloader2 to finally create TDB indexes (i.e. SPO, POS,
OSP, GSPO, etc.).

The first MapReduce job:
------------------------

The map function of the first job parses N-Triples or N-Quads files using
RIOT and it emits (key, value) pairs where the key is the RDF node and the
value is a unique identifier for that triple|quad concatenated with the
position of the RDF node itself in the triple|quad. For example, these are
input (<) and output (>) for two quads:

< (0, <http://example.org/alice/foaf.rdf#me> <
http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <
http://xmlns.com/foaf/0.1/Person> <http://example.org/alice/foaf.rdf> .)
> (<http://example.org/alice/foaf.rdf#me>,
e106576560405e9c91aa4a28d1b35b34|S)
> (<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>,
e106576560405e9c91aa4a28d1b35b34|P)
> (<http://xmlns.com/foaf/0.1/Person>, e106576560405e9c91aa4a28d1b35b34|O)
> (<http://example.org/alice/foaf.rdf>, e106576560405e9c91aa4a28d1b35b34|G)

< (162, <http://example.org/alice/foaf.rdf#me> <
http://xmlns.com/foaf/0.1/name> "Alice" <http://example.org/alice/foaf.rdf>
.)
> (<http://example.org/alice/foaf.rdf#me>,
48bc9707b09e5574ba50d4f58f0e5fec|S)
> (<http://xmlns.com/foaf/0.1/name>, 48bc9707b09e5574ba50d4f58f0e5fec|P)
> ("Alice", 48bc9707b09e5574ba50d4f58f0e5fec|O)
> (<http://example.org/alice/foaf.rdf>, 48bc9707b09e5574ba50d4f58f0e5fec|G)

The reduce function of the first job adds the RDF node received as key to
the node table and it emits a (key, value) pair with the node id and the
same value it has received. For example:

< ("Alice", 48bc9707b09e5574ba50d4f58f0e5fec|O)
> (0, 48bc9707b09e5574ba50d4f58f0e5fec|O)
< ("Bob", 5a1522c8f0bb1b775146e3f0b8066c4d|O)
> (11, 5a1522c8f0bb1b775146e3f0b8066c4d|O)
< ("Bob", 6e03d3250935a6bc8dcb64f14fa5972d|O)
> (11, 6e03d3250935a6bc8dcb64f14fa5972d|O)
< (<http://example.org/alice/foaf.rdf#me>,
e106576560405e9c91aa4a28d1b35b34|S)
> (20, e106576560405e9c91aa4a28d1b35b34|S)
< (<http://example.org/alice/foaf.rdf#me>,
48bc9707b09e5574ba50d4f58f0e5fec|S)
> (20, 48bc9707b09e5574ba50d4f58f0e5fec|S)
< (<http://example.org/alice/foaf.rdf#me>,
9fa6ff77d07e40a805d19be921a79e7d|S)
> (20, 9fa6ff77d07e40a805d19be921a79e7d|S)
< (<http://example.org/alice/foaf.rdf#me>,
3e3e4c277b7db437f5cdf9bae65a02bc|S)
> (20, 3e3e4c277b7db437f5cdf9bae65a02bc|S)

We can only have a single reducer in the first step (and this can be a
problem).

The advantage is that the construction of the node table can happen without
checking if an RDF node exists or not, since the reducer will receive as key
each RDF node only once. Therefore we can simply append each RDF node to the
object file, get its id and add it to the hash->id B+Tree.

The second MapReduce job:
-------------------------

In the map function of the second job we swap the unique identifier for each
triple|quad with the node id:

< (0, 0 48bc9707b09e5574ba50d4f58f0e5fec|O)
> (48bc9707b09e5574ba50d4f58f0e5fec, 0|O)
< (37, 11 5a1522c8f0bb1b775146e3f0b8066c4d|O)
> (5a1522c8f0bb1b775146e3f0b8066c4d, 11|O)
< (75, 11 6e03d3250935a6bc8dcb64f14fa5972d|O)
> (6e03d3250935a6bc8dcb64f14fa5972d, 11|O)

On the reduce function, we can reconstruct the triple|quad using node ids:

< (07a91c76410d888875ace43331a11fcd, 348|P)
< (07a91c76410d888875ace43331a11fcd, 527|S)
< (07a91c76410d888875ace43331a11fcd, 101|O)
< (07a91c76410d888875ace43331a11fcd, 62|G)
> ((null), 527 348 101 62)

< (3e3e4c277b7db437f5cdf9bae65a02bc, 20|S)
< (3e3e4c277b7db437f5cdf9bae65a02bc, 497|O)
< (3e3e4c277b7db437f5cdf9bae65a02bc, 425|P)
> ((null), 20 425 497)

The third MapReduce job:
------------------------

The map function of the third (and last) job emits only keys. For each
triple|quad three|four keys are emitted. Each key contains the node ids in
hex format (exactly the same format used by tdbloader2). The MapReduce
framework will do the sort (instead of the external UNIX sort as in
tdbloader2) and on the reduce side we can generate the B+Tree indexes (SPO,
POS, OSP, SPOG, etc.) as it is done for tdbloader2. The third job has only
one reducer (because I found a problem with the Hadoop version I am using),
but 9 reducers should be used here (i.e. one per TDB index).

This is an example of input/output for the map function:

< (0, 527 348 101 62)
> (000000000000020F 000000000000015C 0000000000000065 000000000000003E|SPOG,
(null))
> (000000000000020F 000000000000015C 0000000000000065 000000000000003E|SPOG,
(null))
> (000000000000015C 0000000000000065 000000000000020F 000000000000003E|POSG,
(null))
> (0000000000000065 000000000000020F 000000000000015C 000000000000003E|OSPG,
(null))
> (000000000000003E 000000000000020F 000000000000015C 0000000000000065|GSPO,
(null))
> (000000000000003E 000000000000015C 0000000000000065 000000000000020F|GPOS,
(null))
> (000000000000003E 0000000000000065 000000000000020F 000000000000015C|GOSP,
(null))
< (15, 20 425 497)
> (0000000000000014 00000000000001A9 00000000000001F1|SPO, (null))
> (00000000000001A9 00000000000001F1 0000000000000014|POS, (null))
> (00000000000001F1 0000000000000014 00000000000001A9|OSP, (null))

The reduce function does not emit any (key, value) pairs, it just creates
the B+Tree indexes.


The code is here (https://github.com/castagna/tdbloader3) and (I say it
again) this is just an experiment.
If you try it, have suggestions on how to improve it or want to help on any
of the TODOs [2], get in touch.

Paolo

[1] http://tech.groups.yahoo.com/group/jena-dev/message/46040
[2] https://github.com/castagna/tdbloader3/blob/master/TODO.txt

Reply via email to