I've been playing around with Spark off and on for the past month and have
developed some XML helper utilities that enable me to filter an XML dataset as
well as transform an XML dataset (we have a lot of XML content). I'm posting
this email to see if there would be any interest in this effort (as I would be
happy to place the code in a public git repo) and/or to see if there is already
something in place that already provides this capability (so I'm not wasting my
time). Under the covers, I'm leverage Saxon-HE. I'll first discuss the
'filtering' aspect.
Assuming you have already created a PairRDD (with the key being the identifier
for the XML document, and the value being the actual XML document), you could
easily do the following from the spark-shell to filter this Pair RDD based on
an arbitrary XPath expression.
## Start the spark-shell (and copy the jar file to executors)
root@ip-10-233-73-204 spark]$ ./bin/spark-shell --jars
lib/uber-SparkUtils-0.1.jar
## Bring in the sequence file (2 million records)
scala> val xmlKeyPair =
sc.sequenceFile[String,String]("s3n://darin/xml/part*").cache()
## Test values against an xpath expression (need to import the the class from
my jar)
scala> import com.darin.xml.XPathEvaluation
scala> val resultsRDD = xmlKeyPair.filter{case(k,v) =>
XPathEvaluation.evaluateString(v, "/doc/meta[year='2010']")}
## Save the results as a hadoop sequence file
scala> resultsRDD.saveAsSequenceFile("s3n:/darin/xml/results")
## Do more xpath expressions to create more filtered datasets, etc.
In my case, the initial PairRDD is about 130GB. With 2 million documents, this
implies an average of around 65KB per document. On a small 3 node AWS cluster
(m3.2xlarge) the above will execute in around 10 minutes. I currently use
spot instances (.08/hr each) so this is very economical.
More complex XPath expressions could be used.
Assume a sample record structure of the following
<person gender="male">
<age>32</age>
<hobbies>
<hobby>tennis</hobby>
<hobby>golf</hobby>
<hobby>programming</hobby>
</hobbies>
<name>
<given-name>Darin</given-name>
<surname>McBeath</surname>
</name>
<address>
<street>8000 Park Lake Dr</street>
<city>Mason</city>
<state>Ohio</state>
</address>
</person>
The following XPath expressions could be used.
// Exact match where the surname equals 'McBeath'
"exists(/person/name[surname='McBeath'])"
// Exact match where the person gender attribute equals 'male'
"exists(/person[@gender='male'])"
// Where the person age is between 30 and 40
"exists(/person[(xs:integer(age) >= 30) and (xs:integer(age) <= 40)])"
// Exact match (after lower-case conversion) where the surname equals 'mcbeath'
"exists(/person/name[lower-case(string-join(surname/text(),' '))='mcbeath'])"
// Exact match (after lower-case conversion) where within a name a surname
equals 'mcbeath' and given-name equals 'darin'
"exists(/person[name[lower-case(string-join(surname/text(),' '))='mcbeath' and
lower-case(string-join(given-name/text(),' '))='darin']])"
// Exact match (after lower-case conversion) where within a name a surname
equals 'mcbeath' and given-name equals 'darin' or 'darby'
"exists(/person[name[lower-case(string-join(surname/text(),' '))='mcbeath' and
lower-case(string-join(given-name/text(),' '))=('darin','darby')]])"
// Search/Token match (after lower-case conversion) where an immediate text
node(s) of street contains the token 'lake'
"exists(/person/address[tokenize(lower-case(string-join(street/text(),'
')),'\\W+') = 'lake'])"
// Search/Token match (after lower-case conversion) where any text node
descendant of person contains the token 'lake'
"exists(/person[tokenize(lower-case(string-join(.//text(),' ')),'\\W+') =
'lake'])"));
// Search/Token 'wildcard' match (after lower-case conversion) where an
immediate text node(s) of street contains the token matching the wildcard
expression 'pa*'
"exists(/person/address[(for $i in
tokenize(lower-case(string-join(street/text(),' ')),'\\W+') return
matches($i,'pa.+')) = true()])"
// Search/Token 'wildcard' match (after lower-case conversion) where an
immediate text node(s) of street contains the token matching the wildcard
expression 'pa?k'
"exists(/person/address[(for $i in
tokenize(lower-case(string-join(street/text(),' ')),'\\W+') return
matches($i,'pa.?k')) = true()])"
// Exact match were first hobby is 'tennis'
"exists(/person/hobbies/hobby[position() = 1 and ./text() = 'tennis'])"
// Exact match were first or second hobby is 'tennis'
"exists(/person/hobbies/hobby[position() <= 2 and ./text() = 'tennis'])"
// Exact match where the state does not equal 'Ohio'
"not(exists(//state[.='Ohio']))"
// Search/Token match (after lower-case conversion) where any text node
descendant of person contains the phrase 'park lake'
"exists(/person[matches(string-join(tokenize(lower-case(string-join(.//text(),'
')),'\\W+'),' '), 'park lake')])"));
This is very much work in progress. But, I'm curious as to whether there is
interest in the Spark community for something like this. I have also done
something similar with 'transform' that would allow you to easily transform an
XML document into something else (other XML, JSON, etc.). If there is
interest, I can post about that as well.
Darin.