Repository: spark
Updated Branches:
  refs/heads/master 0df5ce1bc -> 153c2f9ac


[SPARK-16271][SQL] Implement Hive's UDFXPathUtil

## What changes were proposed in this pull request?
This patch ports Hive's UDFXPathUtil over to Spark, which can be used to 
implement xpath functionality in Spark in the near future.

## How was this patch tested?
Added two new test suites UDFXPathUtilSuite and ReusableStringReaderSuite. They 
have been ported over from Hive (but rewritten in Scala in order to leverage 
ScalaTest).

Author: petermaxlee <petermax...@gmail.com>

Closes #13961 from petermaxlee/xpath.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/153c2f9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/153c2f9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/153c2f9a

Branch: refs/heads/master
Commit: 153c2f9ac12846367a09684fd875c496d350a603
Parents: 0df5ce1
Author: petermaxlee <petermax...@gmail.com>
Authored: Tue Jun 28 21:07:52 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Jun 28 21:07:52 2016 -0700

----------------------------------------------------------------------
 .../catalyst/expressions/xml/UDFXPathUtil.java  | 192 +++++++++++++++++++
 .../xml/ReusableStringReaderSuite.scala         | 103 ++++++++++
 .../expressions/xml/UDFXPathUtilSuite.scala     |  99 ++++++++++
 3 files changed, 394 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/153c2f9a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
new file mode 100644
index 0000000..01a11f9
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+import javax.xml.namespace.QName;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+/**
+ * Utility class for all XPath UDFs. Each UDF instance should keep an instance 
of this class.
+ *
+ * This is based on Hive's UDFXPathUtil implementation.
+ */
+public class UDFXPathUtil {
+  private XPath xpath = XPathFactory.newInstance().newXPath();
+  private ReusableStringReader reader = new ReusableStringReader();
+  private InputSource inputSource = new InputSource(reader);
+  private XPathExpression expression = null;
+  private String oldPath = null;
+
+  public Object eval(String xml, String path, QName qname) {
+    if (xml == null || path == null || qname == null) {
+      return null;
+    }
+
+    if (xml.length() == 0 || path.length() == 0) {
+      return null;
+    }
+
+    if (!path.equals(oldPath)) {
+      try {
+        expression = xpath.compile(path);
+      } catch (XPathExpressionException e) {
+        expression = null;
+      }
+      oldPath = path;
+    }
+
+    if (expression == null) {
+      return null;
+    }
+
+    reader.set(xml);
+
+    try {
+      return expression.evaluate(inputSource, qname);
+    } catch (XPathExpressionException e) {
+      throw new RuntimeException ("Invalid expression '" + oldPath + "'", e);
+    }
+  }
+
+  public Boolean evalBoolean(String xml, String path) {
+    return (Boolean) eval(xml, path, XPathConstants.BOOLEAN);
+  }
+
+  public String evalString(String xml, String path) {
+    return (String) eval(xml, path, XPathConstants.STRING);
+  }
+
+  public Double evalNumber(String xml, String path) {
+    return (Double) eval(xml, path, XPathConstants.NUMBER);
+  }
+
+  public Node evalNode(String xml, String path) {
+    return (Node) eval(xml, path, XPathConstants.NODE);
+  }
+
+  public NodeList evalNodeList(String xml, String path) {
+    return (NodeList) eval(xml, path, XPathConstants.NODESET);
+  }
+
+  /**
+   * Reusable, non-threadsafe version of {@link StringReader}.
+   */
+  public static class ReusableStringReader extends Reader {
+
+    private String str = null;
+    private int length = -1;
+    private int next = 0;
+    private int mark = 0;
+
+    public ReusableStringReader() {
+    }
+
+    public void set(String s) {
+      this.str = s;
+      this.length = s.length();
+      this.mark = 0;
+      this.next = 0;
+    }
+
+    /** Check to make sure that the stream has not been closed */
+    private void ensureOpen() throws IOException {
+      if (str == null)
+        throw new IOException("Stream closed");
+    }
+
+    @Override
+    public int read() throws IOException {
+      ensureOpen();
+      if (next >= length)
+        return -1;
+      return str.charAt(next++);
+    }
+
+    @Override
+    public int read(char cbuf[], int off, int len) throws IOException {
+      ensureOpen();
+      if ((off < 0) || (off > cbuf.length) || (len < 0)
+        || ((off + len) > cbuf.length) || ((off + len) < 0)) {
+        throw new IndexOutOfBoundsException();
+      } else if (len == 0) {
+        return 0;
+      }
+      if (next >= length)
+        return -1;
+      int n = Math.min(length - next, len);
+      str.getChars(next, next + n, cbuf, off);
+      next += n;
+      return n;
+    }
+
+    @Override
+    public long skip(long ns) throws IOException {
+      ensureOpen();
+      if (next >= length)
+        return 0;
+      // Bound skip by beginning and end of the source
+      long n = Math.min(length - next, ns);
+      n = Math.max(-next, n);
+      next += n;
+      return n;
+    }
+
+    @Override
+    public boolean ready() throws IOException {
+      ensureOpen();
+      return true;
+    }
+
+    @Override
+    public boolean markSupported() {
+      return true;
+    }
+
+    @Override
+    public void mark(int readAheadLimit) throws IOException {
+      if (readAheadLimit < 0) {
+        throw new IllegalArgumentException("Read-ahead limit < 0");
+      }
+      ensureOpen();
+      mark = next;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      ensureOpen();
+      next = mark;
+    }
+
+    @Override
+    public void close() {
+      str = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/153c2f9a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala
new file mode 100644
index 0000000..e06d209
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import java.io.IOException
+
+import org.apache.spark.SparkFunSuite
+import 
org.apache.spark.sql.catalyst.expressions.xml.UDFXPathUtil.ReusableStringReader
+
+/**
+ * Unit tests for [[UDFXPathUtil.ReusableStringReader]].
+ *
+ * Loosely based on Hive's TestReusableStringReader.java.
+ */
+class ReusableStringReaderSuite extends SparkFunSuite {
+
+  private val fox = "Quick brown fox jumps over the lazy dog."
+
+  test("empty reader") {
+    val reader = new ReusableStringReader
+
+    intercept[IOException] {
+      reader.read()
+    }
+
+    intercept[IOException] {
+      reader.ready()
+    }
+
+    reader.close()
+  }
+
+  test("mark reset") {
+    val reader = new ReusableStringReader
+
+    if (reader.markSupported()) {
+      reader.asInstanceOf[ReusableStringReader].set(fox)
+      assert(reader.ready())
+
+      val cc = new Array[Char](6)
+      var read = reader.read(cc)
+      assert(read == 6)
+      assert("Quick " == new String(cc))
+
+      reader.mark(100)
+
+      read = reader.read(cc)
+      assert(read == 6)
+      assert("brown " == new String(cc))
+
+      reader.reset()
+      read = reader.read(cc)
+      assert(read == 6)
+      assert("brown " == new String(cc))
+    }
+    reader.close()
+  }
+
+  test("skip") {
+    val reader = new ReusableStringReader
+    reader.asInstanceOf[ReusableStringReader].set(fox)
+
+    // skip entire the data:
+    var skipped = reader.skip(fox.length() + 1)
+    assert(fox.length() == skipped)
+    assert(-1 == reader.read())
+
+    reader.asInstanceOf[ReusableStringReader].set(fox) // reset the data
+    val cc = new Array[Char](6)
+    var read = reader.read(cc)
+    assert(read == 6)
+    assert("Quick " == new String(cc))
+
+    // skip some piece of data:
+    skipped = reader.skip(30)
+    assert(skipped == 30)
+    read = reader.read(cc)
+    assert(read == 4)
+    assert("dog." == new String(cc, 0, read))
+
+    // skip when already at EOF:
+    skipped = reader.skip(300)
+    assert(skipped == 0, skipped)
+    assert(reader.read() == -1)
+
+    reader.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/153c2f9a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala
new file mode 100644
index 0000000..a5614f8
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import javax.xml.xpath.XPathConstants.STRING
+
+import org.w3c.dom.Node
+import org.w3c.dom.NodeList
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Unit tests for [[UDFXPathUtil]]. Loosely based on Hive's 
TestUDFXPathUtil.java.
+ */
+class UDFXPathUtilSuite extends SparkFunSuite {
+
+  private lazy val util = new UDFXPathUtil
+
+  test("illegal arguments") {
+    // null args
+    assert(util.eval(null, "a/text()", STRING) == null)
+    assert(util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", 
null, STRING) == null)
+    assert(
+      util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/text()", null) == null)
+
+    // empty String args
+    assert(util.eval("", "a/text()", STRING) == null)
+    assert(util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"", STRING) == null)
+
+    // wrong expression:
+    assert(
+      util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/text(", STRING) == null)
+  }
+
+  test("generic eval") {
+    val ret =
+      util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/c[2]/text()", STRING)
+    assert(ret == "c2")
+  }
+
+  test("boolean eval") {
+    var ret =
+      
util.evalBoolean("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/b[1]/text()")
+    assert(ret == true)
+
+    ret = 
util.evalBoolean("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/b[4]")
+    assert(ret == false)
+  }
+
+  test("string eval") {
+    var ret =
+      
util.evalString("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/b[3]/text()")
+    assert(ret == "b3")
+
+    ret =
+      
util.evalString("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/b[4]/text()")
+    assert(ret == "")
+
+    ret = util.evalString(
+      "<a><b>true</b><b k=\"foo\">FALSE</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/b[2]/@k")
+    assert(ret == "foo")
+  }
+
+  test("number eval") {
+    var ret =
+      
util.evalNumber("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", 
"a/c[2]")
+    assert(ret == -77.0d)
+
+    ret = util.evalNumber(
+      "<a><b>true</b><b k=\"foo\">FALSE</b><b>b3</b><c>c1</c><c>c2</c></a>", 
"a/b[2]/@k")
+    assert(ret.isNaN)
+  }
+
+  test("node eval") {
+    val ret = 
util.evalNode("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", 
"a/c[2]")
+    assert(ret != null && ret.isInstanceOf[Node])
+  }
+
+  test("node list eval") {
+    val ret = 
util.evalNodeList("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", 
"a/*")
+    assert(ret != null && ret.isInstanceOf[NodeList])
+    assert(ret.asInstanceOf[NodeList].getLength == 5)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to