[ 
https://issues.apache.org/jira/browse/JENA-2309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17505891#comment-17505891
 ] 

Claus Stadler commented on JENA-2309:
-------------------------------------

Hi Andy, thanks for all the information!

I opened this issue to communicate all the problems we had to tackle in the 
past (especially during the transition from 3.17.0 to 4.0.0). It may very well 
be that certain things are already solved by now. In the coming days I will 
have a look at which problems still exist for us.

We not only looked at elephas for ideas but we actually integrated it into our 
sansa-stack project. We were quite worried about what impact the retirement 
would have. However for reading RDF we created our own "generic parser" hadoop 
input system which can split trig, csv with newlines, and json arrays so which 
it is thus also suitable for turtle and ntriples; actually I was thinking about 
contributing it to elephas but back then it wasn't quite as stable as it is now.

As for output, we tried elehphas' FileOutputFormats but Apache Spark performed 
better when using rdd.saveAsTextFile and internally using a mapPartition 
operation where we map RDF to lines of text via StreamRDF. I don't recall the 
details anymore though what exact issues we had.

 
Of course there is a tradeoff when parsing RDF in a distributed setting.

For example, when ingesting turtle data, in the hadoop InputFormat we first 
scan the beginning of the file for prefixes on the driver (i.e. this is not 
distributed). We optimistically assume that there is no change of the prefixes 
in the middle of data. However, then we put all the known prefixes into the job 
context and split the input.

Now each worker only knows some split of data (some byte start and end offset)  
and the context with the prefixes.

We then search for record starts, i.e. byte positions where w.r.t. the prefixes 
and the RDF parser we can successfully probe for a configurable amount of 
triples without error. 

For this we need to be able to preconfigure the RDFParser with our IRIxResolver 
and the prefixes from the hadoop context - this feature is still missing, even 
though RDFParser internally creates those.

So we'd need e.g.
{code:java}
RDFParser.prologue(p)...{code}

AscCluster nodes work <b>independently</b> on the RDF splits we simply have no 
clue about blank node scoping so we just use them as given. Each node will 
create their own RDF parser instances and each instance must parse the same 
label to the same node. So in principle the driver could generate some random 
UUID but this then needs to be broadcasted to all worker nodes such that they 
can configure their RDF parser in the same way.

For writing RDF out with spark, each partition's RDF needs to be written out 
w.r.t. to the prefixes in the nodes context but the prefixes themselves <b>must 
not be written out</b> - the only we to achive that right now is to set 
WriterStreamRDFBase internal prefixMap and irixresolver with reflection.

The appraoch is roughly:

{code:java}
Broadcast<PrefixMap> bc = ...;
RDD<Quad> rddOfQuads = ...;
RDD<String> rddOfLines = 
rddOfQuads.mapPartitions(configureStreamRdfWithPrefixesFrom(bc)
  .theReturnAnIteratorOfStringsForTheLines());
rddOfLines = sparkContext.union(prefixLinesRdd, rddOfLines); // prepend prefixes
rddOfLines.saveAsTextFile();
{code}


> There are several independent changes being suggested. They have different 
> timescales.
What I meant to say is that from what I outlined above this issue should be 
mostly about adding some additional constructor argument, some context symbol 
or some getter and opening up some private api.
Of course if there are things that are more complicated it makes sense to treat 
them separately.


> Enhancing Riot for Big Data
> ---------------------------
>
>                 Key: JENA-2309
>                 URL: https://issues.apache.org/jira/browse/JENA-2309
>             Project: Apache Jena
>          Issue Type: Improvement
>          Components: RIOT
>    Affects Versions: Jena 4.5.0
>            Reporter: Claus Stadler
>            Priority: Major
>
> We have successfully managed to adapt Jena Riot to quite efficiently work 
> within Apache Spark, however we needed to make certain adaption that rely on 
> brittle reflection hacks and APIs that are marked for removal (namely 
> PipedRDFIterator):
> In principle, for writing RDF data out, we implemented a mapPartition 
> operation that maps the input RDF to lines of text via StreamRDF which is 
> understood by apache spark's RDD.saveAsText();
> However, for use with Big Data we need to
>  * disable blank node relabeling
>  * preconfigure the StreamRDF with a given set of prefixes (that is 
> broadcasted to each node)
> Furthermore
>  * The default PrefixMapping implementation is very inefficient when it comes 
> to handling a dump of prefix.cc. I am using 2500 prefixes. Each RDF term in 
> the output results in a scan of the full prefix map
>  * Even if the PrefixMapping is optimized, the recently added PrefixMap 
> adapter again does scanning - and its a final class so no easy override.
> And finally, we have a use case to allow for relative IRIs in the RDF: We are 
> creating DCAT catalogs from directory content as in this file:
> DCAT catalog with relative IRIs over directory content: [work-in-progress 
> example|https://hobbitdata.informatik.uni-leipzig.de/lsqv2/dumps/dcat.trig]
> If you retrieve the file with a semantic web client (riot, rapper, etc) it 
> will automatically use the download location as the base url and thus giving 
> absolute URLs to the published artifacts - regardless under which URL that 
> directory is hosted.
> *IRIxResolver: We rely on IRIProviderJDK which states "do not use in 
> production" however it is the only one the let us achieve the goal. [our 
> code|https://github.com/Scaseco/jenax/blob/dd51ef9a39013d4ddbb4806fcad36b03a4dbaa7c/jenax-arq-parent/jenax-arq-utils/src/main/java/org/aksw/jenax/arq/util/irixresolver/IRIxResolverUtils.java#L30]
>  * Prologue: We use reflection to set the resolver and would like the 
> setResolver method [our 
> code|https://github.com/Scaseco/jenax/blob/dd51ef9a39013d4ddbb4806fcad36b03a4dbaa7c/jenax-arq-parent/jenax-arq-utils/src/main/java/org/aksw/jenax/arq/util/prologue/PrologueUtils.java#L65]
>  * WriterStreamRDFBase: We need to be able to create instances of 
> WriterStreamRDF classes which we can configure with our own PrefixMap 
> instance (e.g. trie-backed), and our own LabelToNode stragegy ("asGiven") - 
> [our 
> code|https://github.com/SANSA-Stack/SANSA-Stack/blob/40fa6f89f421eee22c9789973ec828ec3f970c33/sansa-spark-jena-java/src/main/java/net/sansa_stack/spark/io/rdf/output/RddRdfWriter.java#L387]
>  * PrefixMapAdapter: We need an adapter that inherits the performance 
> characteristics of the backing PrefixMapping [our 
> code|https://github.com/Scaseco/jenax/blob/dd51ef9a39013d4ddbb4806fcad36b03a4dbaa7c/jenax-arq-parent/jenax-arq-utils/src/main/java/org/aksw/jenax/arq/util/prefix/PrefixMapAdapter.java#L57]
>  * PrefixMapping: We need a trie-based implementation for efficiency. We 
> created one based on the trie class in jena which on initial experiments was 
> sufficiently fast. Though we did not benchmark whether e.g. PatriciaTrie from 
> commons collection would be faster. [our 
> code|https://github.com/Scaseco/jenax/blob/dd51ef9a39013d4ddbb4806fcad36b03a4dbaa7c/jenax-arq-parent/jenax-arq-utils/src/main/java/org/aksw/jenax/arq/util/prefix/PrefixMappingTrie.java#L27]
> With PrefixMapTrie the profiler showed that the amout of time spent on 
> abbreviate went from ~100% to 1% - though not totally sure about standard 
> conformance here.
>  * PipedRDFIterator / AsyncParser: We can read trig as a Splittable format 
> (which is pretty cool) - however this requires being able to start and stop 
> the RDF parser at will for probing. In other words, AsyncParser needs to 
> return ClosableIterators whose close method actually stops the parsing 
> thread. Also when scanning for prefixes we want to be able to create rules 
> such as "as long as the parser emits a prefix with less than e.g. 100 
> non-prefix events in between keep looking for prefixes" - AsyncParser has the 
> API for it with EltStreamRDF but it is private.
> For future-proofness we'd have these use cases to be reflected in jena.
> Because we have sorted all the above issues mostly out I'd prefer to address 
> these things with only one or a few PRs (maybe the ClosableIterators on 
> AsyncParsers would be more work because our code only did that for 
> PipedRDFIterator and I haven't looked in detail into the new architecture).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to