Updated the parser to stream large files for fn:collection. - The SAXContentHandler now can send back elements based on a child path expression instead of the complete file. - The collection function implementation has been updated to use this.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/44ccedb8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/44ccedb8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/44ccedb8 Branch: refs/heads/master Commit: 44ccedb806b09d153f873cdd364e7c9eaf111941 Parents: 7d02759 Author: Preston Carman <[email protected]> Authored: Mon May 19 16:35:15 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Mon May 19 16:35:15 2014 -0700 ---------------------------------------------------------------------- .../VXQueryCollectionOperatorDescriptor.java | 93 +-------- .../vxquery/xmlparser/SAXContentHandler.java | 197 +++++++++++++++++-- .../org/apache/vxquery/xmlparser/XMLParser.java | 30 ++- 3 files changed, 220 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/44ccedb8/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java index 121f0f9..1f4bb2f 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java @@ -18,28 +18,24 @@ package org.apache.vxquery.metadata; import java.io.File; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; -import org.apache.vxquery.datamodel.accessors.PointablePool; -import org.apache.vxquery.datamodel.accessors.PointablePoolFactory; -import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; -import org.apache.vxquery.exceptions.SystemException; -import org.apache.vxquery.runtime.functions.step.ChildPathStepOperatorDescriptor; +import org.apache.vxquery.context.DynamicContext; +import org.apache.vxquery.types.SequenceType; import org.apache.vxquery.xmlparser.ITreeNodeIdProvider; import org.apache.vxquery.xmlparser.TreeNodeIdProvider; import org.apache.vxquery.xmlparser.XMLParser; -import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; import edu.uci.ics.hyracks.api.context.IHyracksTaskContext; import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable; import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider; import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry; -import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage; import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender; import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils; @@ -71,13 +67,11 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO final int fieldOutputCount = recordDescProvider.getOutputRecordDescriptor(getActivityId(), 0).getFieldCount(); final ByteBuffer frame = ctx.allocateFrame(); final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize(), fieldOutputCount); - final ArrayBackedValueStorage abvsFileNode = new ArrayBackedValueStorage(); final short partitionId = (short) ctx.getTaskAttemptId().getTaskId().getPartition(); final ITreeNodeIdProvider nodeIdProvider = new TreeNodeIdProvider(partitionId, dataSourceId, totalDataSources); final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId(); - final int frameSize = ctx.getFrameSize(); - final PointablePool ppool = PointablePoolFactory.INSTANCE.createPointablePool(); - final ChildPathStepOperatorDescriptor childPathStep = new ChildPathStepOperatorDescriptor(ctx, ppool); + final DynamicContext dCtx = (DynamicContext) ctx.getJobletContext().getGlobalJobData(); + final List<SequenceType> childSequenceTypes = new ArrayList<SequenceType>(); final String collectionName = collectionPartitions[partition % collectionPartitions.length]; final XMLParser parser = new XMLParser(false, nodeIdProvider);; @@ -87,6 +81,10 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO public void open() throws HyracksDataException { appender.reset(frame, true); writer.open(); + + for (int typeCode : childSeq) { + childSequenceTypes.add(dCtx.getStaticContext().lookupSequenceType(typeCode)); + } } @Override @@ -102,7 +100,7 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO Iterator<File> it = FileUtils.iterateFiles(collectionDirectory, new VXQueryIOFileFilter(), TrueFileFilter.INSTANCE); while (it.hasNext()) { - addNextXmlNode(it.next(), t); + parser.parseOutElements(it.next(), frame, appender, writer, fta, t, childSequenceTypes); } } } else { @@ -111,77 +109,6 @@ public class VXQueryCollectionOperatorDescriptor extends AbstractSingleActivityO } } - /** - * Add the document node to the frame output. - */ - private void addNextXmlNode(File file, int t) throws HyracksDataException { - // Now add new field. - abvsFileNode.reset(); - try { - parser.parseFile(file, abvsFileNode); - } catch (HyracksDataException e) { - e.setNodeId(nodeId); - throw e; - } - - TaggedValuePointable tvp = ppool.takeOne(TaggedValuePointable.class); - if (childSeq.isEmpty()) { - // Can not fit XML file into frame. - if (frameSize <= (abvsFileNode.getLength() - abvsFileNode.getStartOffset())) { - throw new HyracksDataException( - "XML node is to large for the current frame size (VXQueryCollectionOperatorDescriptor.addXmlFile)."); - } - tvp.set(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(), abvsFileNode.getLength()); - addNodeToTuple(tvp, t); - } else { - // Process child nodes. - tvp.set(abvsFileNode.getByteArray(), abvsFileNode.getStartOffset(), abvsFileNode.getLength()); - processChildStep(tvp, t); - } - ppool.giveBack(tvp); - } - - private void processChildStep(TaggedValuePointable tvp, int t) throws HyracksDataException { - try { - childPathStep.init(tvp, childSeq); - } catch (SystemException e) { - throw new HyracksDataException("Child path step failed to load node tree."); - } - try { - TaggedValuePointable result = ppool.takeOne(TaggedValuePointable.class); - while (childPathStep.step(result)) { - addNodeToTuple(result, t); - } - ppool.giveBack(result); - } catch (AlgebricksException e) { - throw new HyracksDataException(e); - } - } - - private void addNodeToTuple(TaggedValuePointable result, int t) throws HyracksDataException { - // Send to the writer. - if (!addNodeToTupleAppender(result, t)) { - FrameUtils.flushFrame(frame, writer); - appender.reset(frame, true); - if (!addNodeToTupleAppender(result, t)) { - throw new HyracksDataException( - "Could not write frame (VXQueryCollectionOperatorDescriptor.createPushRuntime)."); - } - } - } - - private boolean addNodeToTupleAppender(TaggedValuePointable result, int t) throws HyracksDataException { - // First copy all new fields over. - if (fta.getFieldCount() > 0) { - for (int f = 0; f < fta.getFieldCount(); ++f) { - if (!appender.appendField(fta, t, f)) { - return false; - } - } - } - return appender.appendField(result.getByteArray(), result.getStartOffset(), result.getLength()); - } - @Override public void fail() throws HyracksDataException { writer.fail(); http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/44ccedb8/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java index a8ec0b9..5a64ddd 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/SAXContentHandler.java @@ -16,9 +16,11 @@ package org.apache.vxquery.xmlparser; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.vxquery.datamodel.accessors.TaggedValuePointable; import org.apache.vxquery.datamodel.accessors.nodes.NodeTreePointable; import org.apache.vxquery.datamodel.builders.nodes.AbstractNodeBuilder; import org.apache.vxquery.datamodel.builders.nodes.AttributeNodeBuilder; @@ -30,6 +32,10 @@ import org.apache.vxquery.datamodel.builders.nodes.PINodeBuilder; import org.apache.vxquery.datamodel.builders.nodes.TextNodeBuilder; import org.apache.vxquery.datamodel.values.ValueTag; import org.apache.vxquery.types.BuiltinTypeQNames; +import org.apache.vxquery.types.ElementType; +import org.apache.vxquery.types.NameTest; +import org.apache.vxquery.types.NodeType; +import org.apache.vxquery.types.SequenceType; import org.apache.vxquery.xmlquery.query.XQueryConstants; import org.xml.sax.Attributes; import org.xml.sax.ContentHandler; @@ -37,47 +43,79 @@ import org.xml.sax.Locator; import org.xml.sax.SAXException; import org.xml.sax.ext.LexicalHandler; +import edu.uci.ics.hyracks.api.comm.IFrameWriter; +import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; +import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable; import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage; +import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils; public class SAXContentHandler implements ContentHandler, LexicalHandler { - private final ArrayBackedValueStorage docABVS; + private final AttributeNodeBuilder anb; - private final boolean createNodeIds; + private FrameTupleAppender appender; private final boolean attachTypes; - private final ITreeNodeIdProvider nodeIdProvider; - - private final ArrayBackedValueStorage tempABVS; + private final StringBuilder buffer; - private final DocumentNodeBuilder docb; + private String[] childLocalName = null; - private final TextNodeBuilder tnb; + private String[] childUri = null; private final CommentNodeBuilder cnb; - private final PINodeBuilder pinb; - - private final AttributeNodeBuilder anb; + private final boolean createNodeIds; private final DictionaryBuilder db; - private final StringBuilder buffer; + private int depth = 0; + + private final ArrayBackedValueStorage docABVS; + + private final DocumentNodeBuilder docb; + + private final ArrayBackedValueStorage elementABVS; private final List<ElementNodeBuilder> enbStack; + private ByteBuffer frame; + private final List<ElementNodeBuilder> freeENBList; + private FrameTupleAccessor fta; + private int nodeIdCounter; + private final ITreeNodeIdProvider nodeIdProvider; + private boolean pendingText; + private final PINodeBuilder pinb; + + private final ArrayBackedValueStorage resultABVS; + + private boolean[] subElement = null; + + private int t; + + private final ArrayBackedValueStorage tempABVS; + + private final TextNodeBuilder tnb; + + private final TaggedValuePointable tvp; + + private IFrameWriter writer; + public SAXContentHandler(boolean attachTypes, ITreeNodeIdProvider nodeIdProvider) { docABVS = new ArrayBackedValueStorage(); - this.createNodeIds = nodeIdProvider != null; + elementABVS = new ArrayBackedValueStorage(); + resultABVS = new ArrayBackedValueStorage(); + tempABVS = new ArrayBackedValueStorage(); + createNodeIds = nodeIdProvider != null; this.attachTypes = attachTypes; this.nodeIdProvider = nodeIdProvider; - this.tempABVS = new ArrayBackedValueStorage(); docb = new DocumentNodeBuilder(); tnb = new TextNodeBuilder(); cnb = new CommentNodeBuilder(); @@ -88,6 +126,7 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler { enbStack = new ArrayList<ElementNodeBuilder>(); freeENBList = new ArrayList<ElementNodeBuilder>(); pendingText = false; + tvp = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable(); } @Override @@ -102,6 +141,9 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler { flushText(); docb.endChildrenChunk(); docb.finish(); + if (subElement == null) { + writeElement(); + } } catch (IOException e) { e.printStackTrace(); throw new SAXException(e); @@ -115,6 +157,14 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler { ElementNodeBuilder enb = enbStack.remove(enbStack.size() - 1); enb.endChildrenChunk(); endChildInParent(enb); + + if (foundChildPathStep()) { + writeElement(); + } + if (subElement != null && depth <= subElement.length) { + subElement[depth - 1] = false; + } + depth--; freeENB(enb); } catch (IOException e) { e.printStackTrace(); @@ -178,12 +228,22 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler { @Override public void startElement(String uri, String localName, String name, Attributes atts) throws SAXException { + depth++; + // Check path step if it exists. + if (subElement != null && depth <= subElement.length) { + if (uri.compareTo(childUri[depth - 1]) == 0) { + if (localName.compareTo(childLocalName[depth - 1]) == 0) { + subElement[depth - 1] = true; + } + } + } + try { flushText(); int idx = name.indexOf(':'); String prefix = idx < 0 ? "" : name.substring(0, idx); ElementNodeBuilder enb = createENB(); - startChildInParent(enb); + startChildInParent(enb, foundChildPathStep()); int uriCode = db.lookup(uri); int localNameCode = db.lookup(localName); int prefixCode = db.lookup(prefix); @@ -296,7 +356,60 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler { public void startEntity(String name) throws SAXException { } - public void write(ArrayBackedValueStorage abvs) throws IOException { + public void setChildPathSteps(List<SequenceType> childSeq) { + // this.childSeq = childSeq; + if (!childSeq.isEmpty()) { + subElement = new boolean[childSeq.size()]; + childUri = new String[childSeq.size()]; + childLocalName = new String[childSeq.size()]; + } + + int index = 0; + for (SequenceType sType : childSeq) { + NodeType nodeType = (NodeType) sType.getItemType(); + ElementType eType = (ElementType) nodeType; + NameTest nameTest = eType.getNameTest(); + childUri[index] = getStringFromBytes(nameTest.getUri()); + childLocalName[index] = getStringFromBytes(nameTest.getLocalName());; + index++; + } + } + + public void setupElementWriter(ByteBuffer frame, FrameTupleAppender appender, IFrameWriter writer, + FrameTupleAccessor fta, int t) throws IOException { + this.frame = frame; + this.appender = appender; + this.writer = writer; + this.fta = fta; + this.t = t; + } + + public void writeElement() throws IOException { + resultABVS.reset(); + DataOutput out = resultABVS.getDataOutput(); + out.write(ValueTag.NODE_TREE_TAG); + byte header = NodeTreePointable.HEADER_DICTIONARY_EXISTS_MASK; + if (attachTypes) { + header |= NodeTreePointable.HEADER_TYPE_EXISTS_MASK; + } + if (createNodeIds) { + header |= NodeTreePointable.HEADER_NODEID_EXISTS_MASK; + } + out.write(header); + if (createNodeIds) { + out.writeInt(nodeIdProvider.getId()); + } + db.write(resultABVS); + if (subElement == null) { + out.write(docABVS.getByteArray(), docABVS.getStartOffset(), docABVS.getLength()); + } else { + out.write(elementABVS.getByteArray(), elementABVS.getStartOffset(), elementABVS.getLength()); + } + tvp.set(resultABVS.getByteArray(), resultABVS.getStartOffset(), resultABVS.getLength()); + addNodeToTuple(tvp, t); + } + + public void writeDocument(ArrayBackedValueStorage abvs) throws IOException { DataOutput out = abvs.getDataOutput(); out.write(ValueTag.NODE_TREE_TAG); byte header = NodeTreePointable.HEADER_DICTIONARY_EXISTS_MASK; @@ -330,8 +443,15 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler { } private void startChildInParent(AbstractNodeBuilder anb) throws IOException { + startChildInParent(anb, false); + } + + private void startChildInParent(AbstractNodeBuilder anb, boolean track) throws IOException { if (enbStack.isEmpty()) { docb.startChild(anb); + } else if (track) { + elementABVS.reset(); + anb.reset(elementABVS); } else { peekENBStackTop().startChild(anb); } @@ -344,4 +464,51 @@ public class SAXContentHandler implements ContentHandler, LexicalHandler { peekENBStackTop().endChild(anb); } } + + private void addNodeToTuple(TaggedValuePointable result, int t) throws HyracksDataException { + // Send to the writer. + if (!addNodeToTupleAppender(result, t)) { + FrameUtils.flushFrame(frame, writer); + appender.reset(frame, true); + if (!addNodeToTupleAppender(result, t)) { + throw new HyracksDataException("Could not write frame (SAXContentHandler.addNodeToTuple)."); + } + } + } + + private boolean addNodeToTupleAppender(TaggedValuePointable result, int t) throws HyracksDataException { + // First copy all new fields over. + if (fta.getFieldCount() > 0) { + for (int f = 0; f < fta.getFieldCount(); ++f) { + if (!appender.appendField(fta, t, f)) { + return false; + } + } + } + return appender.appendField(result.getByteArray(), result.getStartOffset(), result.getLength()); + } + + private String getStringFromBytes(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + UTF8StringPointable.toString(sb, bytes, 0); + return sb.toString(); + } + + /** + * Determines if the correct path step is active. + */ + private boolean foundChildPathStep() { + if (subElement.length != depth) { + // Not the correct depth. + return false; + } + for (boolean b : subElement) { + if (!b) { + // Found a path step that did not match. + return false; + } + } + return true; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/44ccedb8/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java ---------------------------------------------------------------------- diff --git a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java index 43402ed..edef1a1 100644 --- a/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java +++ b/vxquery-core/src/main/java/org/apache/vxquery/xmlparser/XMLParser.java @@ -19,23 +19,29 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.util.List; import java.util.zip.GZIPInputStream; import org.apache.vxquery.exceptions.VXQueryFileNotFoundException; import org.apache.vxquery.exceptions.VXQueryParseException; +import org.apache.vxquery.types.SequenceType; import org.xml.sax.InputSource; import org.xml.sax.SAXException; import org.xml.sax.XMLReader; import org.xml.sax.helpers.XMLReaderFactory; +import edu.uci.ics.hyracks.api.comm.IFrameWriter; import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage; +import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender; public class XMLParser { final XMLReader parser; final SAXContentHandler handler; final InputSource in; - + public XMLParser(boolean attachTypes, ITreeNodeIdProvider idProvider) throws HyracksDataException { try { parser = XMLReaderFactory.createXMLReader(); @@ -56,7 +62,27 @@ public class XMLParser { in.setCharacterStream(new InputStreamReader(new FileInputStream(file))); } parser.parse(in); - handler.write(abvs); + handler.writeDocument(abvs); + } catch (FileNotFoundException e) { + throw new VXQueryFileNotFoundException(e, file); + } catch (SAXException e) { + throw new VXQueryParseException(e, file); + } catch (IOException e) { + throw new HyracksDataException(e); + } + } + + public void parseOutElements(File file, ByteBuffer frame, FrameTupleAppender appender, IFrameWriter writer, + FrameTupleAccessor fta, int t, List<SequenceType> childSeq) throws HyracksDataException { + try { + if (file.getName().toLowerCase().endsWith(".xml.gz")) { + in.setCharacterStream(new InputStreamReader(new GZIPInputStream(new FileInputStream(file)))); + } else { + in.setCharacterStream(new InputStreamReader(new FileInputStream(file))); + } + handler.setChildPathSteps(childSeq); + handler.setupElementWriter(frame, appender, writer, fta, t); + parser.parse(in); } catch (FileNotFoundException e) { throw new VXQueryFileNotFoundException(e, file); } catch (SAXException e) {
