split block if it contains more than one item with the tag we want

Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/61fec3b6
Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/61fec3b6
Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/61fec3b6

Branch: refs/heads/steven/hdfs
Commit: 61fec3b6759f01c3e8d5ace985228e81787af42a
Parents: 924a01b
Author: efikalti <[email protected]>
Authored: Thu Jul 30 11:06:00 2015 +0300
Committer: efikalti <[email protected]>
Committed: Thu Jul 30 11:06:00 2015 +0300

----------------------------------------------------------------------
 .../VXQueryCollectionOperatorDescriptor.java    | 48 ++++++++++++--------
 1 file changed, 29 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/vxquery/blob/61fec3b6/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
index b7627d0..a930457 100644
--- 
a/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
+++ 
b/vxquery-core/src/main/java/org/apache/vxquery/metadata/VXQueryCollectionOperatorDescriptor.java
@@ -32,6 +32,7 @@ import java.util.logging.Logger;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -71,6 +72,7 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
     protected static final Logger LOGGER = 
Logger.getLogger(VXQueryCollectionOperatorDescriptor.class.getName());
     private HDFSFunctions hdfs;
     private String tag;
+    private final String START_TAG = "<?xml version=\"1.0\" 
encoding=\"utf-8\"?>\n";
 
     public VXQueryCollectionOperatorDescriptor(IOperatorDescriptorRegistry 
spec, VXQueryCollectionDataSource ds,
             RecordDescriptor rDesc) {
@@ -141,14 +143,8 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                         Path directory = new Path(collectionModifiedName);
                         Path xmlDocument;
                         if (tag != null) {
-                            String tags[] = tag.split("/");
-                            String start_tag = "<?xml version=\"1.0\" 
encoding=\"utf-8\"?>";
-                            String end_tag = "";
-                            for (int i = 0; i < tags.length - 1; i++) {
-                                start_tag = start_tag + "<" + tags[i] + ">";
-                                end_tag = end_tag + "</" + tags[i] + ">";
-                            }
-                            hdfs.setJob(directory.getName(), tags[tags.length 
- 1]);
+                            hdfs.setJob(directory.getName(), tag);
+                            tag = "<" + tag + ">";
                             Job job = hdfs.getJob();
                             InputFormat inputFormat = hdfs.getinputFormat();
                             try {
@@ -164,22 +160,36 @@ public class VXQueryCollectionOperatorDescriptor extends 
AbstractSingleActivityO
                                 List<FileSplit> inputSplits = 
splitsFactory.getSplits();
                                 ContextFactory ctxFactory = new 
ContextFactory();
                                 int size = inputSplits.size();
+                                InputStream stream;
+                                String value;
+                                RecordReader reader;
+                                TaskAttemptContext context;
                                 for (int i = 0; i < size; i++) {
                                     //read split
-                                    TaskAttemptContext context = 
ctxFactory.createContext(job.getConfiguration(), i);
-                                    RecordReader reader;
+                                    context = 
ctxFactory.createContext(job.getConfiguration(), i);
                                     try {
                                         reader = 
inputFormat.createRecordReader(inputSplits.get(i), context);
                                         reader.initialize(inputSplits.get(i), 
context);
-                                        while (reader.nextKeyValue() == true) {
-                                            String value = 
reader.getCurrentValue().toString();
-                                            String xml = start_tag;
-                                            xml = xml + value + end_tag;
-                                            System.out.println(xml);
-                                            //create an input stream to the 
file currently reading and send it to parser
-                                            InputStream stream = new 
ByteArrayInputStream(
-                                                    
xml.getBytes(StandardCharsets.UTF_8));
-                                            parser.parseHDFSElements(stream, 
writer, fta, i);
+                                        while (reader.nextKeyValue()) {
+                                            value = 
reader.getCurrentValue().toString();
+                                            //Split value if it contains more 
than one item with the tag
+                                            if 
(StringUtils.countMatches(value, tag) > 1) {
+                                                String items[] = 
value.split(tag);
+                                                for (String item : items) {
+                                                    if (item.length() > 0) {
+                                                        item = START_TAG + tag 
+ item;
+                                                        stream = new 
ByteArrayInputStream(
+                                                                
item.getBytes(StandardCharsets.UTF_8));
+                                                        
parser.parseHDFSElements(stream, writer, fta, i);
+                                                    }
+                                                }
+                                            } else {
+                                                value = START_TAG + value;
+                                                //create an input stream to 
the file currently reading and send it to parser
+                                                stream = new 
ByteArrayInputStream(
+                                                        
value.getBytes(StandardCharsets.UTF_8));
+                                                
parser.parseHDFSElements(stream, writer, fta, i);
+                                            }
                                         }
 
                                     } catch (InterruptedException e) {

Reply via email to