Author: rwesten
Date: Thu Dec 12 16:35:08 2013
New Revision: 1550448

URL: http://svn.apache.org/r1550448
Log:
STANBOL-1200: The RdfEntityDataIterator now uses an iterator that also works if 
the SesameRepository does not provide triples sorted by Subjects. This new 
iterator however requires to keep subjects in memory to filter out duplicates

Modified:
    
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java

Modified: 
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java
URL: 
http://svn.apache.org/viewvc/stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java?rev=1550448&r1=1550447&r2=1550448&view=diff
==============================================================================
--- 
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java
 (original)
+++ 
stanbol/trunk/entityhub/indexing/source/sesame/src/main/java/org/apache/stanbol/entityhub/indexing/source/sesame/RdfIndexingSource.java
 Thu Dec 12 16:35:08 2013
@@ -1,10 +1,14 @@
 package org.apache.stanbol.entityhub.indexing.source.sesame;
 
+import info.aduna.iteration.CloseableIteration;
+import info.aduna.iteration.ConvertingIteration;
+import info.aduna.iteration.DistinctIteration;
+import info.aduna.iteration.FilterIteration;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.net.URI;
 import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.HashSet;
@@ -32,6 +36,7 @@ import org.openrdf.model.Literal;
 import org.openrdf.model.Model;
 import org.openrdf.model.Resource;
 import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
 import org.openrdf.model.Value;
 import org.openrdf.model.ValueFactory;
 import org.openrdf.repository.Repository;
@@ -392,71 +397,60 @@ public class RdfIndexingSource extends A
     protected class RdfEntityDataIterator implements EntityDataIterator {
 
         protected final RepositoryConnection connection;
-        protected final RepositoryResult<Statement> stdItr;
+        protected final CloseableIteration<URI,RepositoryException> subjectItr;
         protected final boolean followBNodes;
 
-        private org.openrdf.model.URI currentEntity = null;
-        /**
-         * The last {@link Statement} read from {@link #stdItr}
-         */
-        private Statement currentStd = null;
         /**
          * The current Representation as created by {@link #next()}
          */
         protected RdfRepresentation currentRep;
-        /**
-         * If the {@link #stdItr} is positioned on the 2nd {@link Statement} 
-         * of the next Entity and {@link #currentStd} holds the first one.
-         */
-        private boolean nextInitialised = false;
         
         protected RdfEntityDataIterator(boolean followBNodes,
                 boolean includeInferred, Resource...contexts) throws 
RepositoryException{
             this.connection = repository.getConnection();
-            stdItr = connection.getStatements(null, null, null, 
includeInferred, contexts);
+            CloseableIteration<URI, RepositoryException> converter = 
+                    new ConvertingIteration<Statement, URI, 
RepositoryException>(
+                            connection.getStatements(null, null, null, 
includeInferred, contexts)) {
+                @Override
+                protected URI convert(Statement sourceObject) throws 
RepositoryException {
+                    Resource r = sourceObject.getSubject();
+                    return r instanceof URI ? (URI)r : null;
+                }
+            };
+            CloseableIteration<URI,RepositoryException> filter = 
+                    new FilterIteration<URI,RepositoryException>(converter){
+                @Override
+                protected boolean accept(URI object) throws 
RepositoryException {
+                    return object != null;
+                }    
+            };
+            this.subjectItr = new DistinctIteration<URI, 
RepositoryException>(filter);
             this.followBNodes = followBNodes;
             entityDataIterators.add(this);
         }
         
         @Override
         public boolean hasNext() {
-            if(nextInitialised){
-                return true;
-            }
             try {
-                while(stdItr.hasNext() && (currentStd == null || 
-                        !(currentStd.getSubject() instanceof 
org.openrdf.model.URI))){
-                    currentStd = stdItr.next();
-                }
-                if(stdItr.hasNext()){
-                    nextInitialised = true;
-                }
-                return nextInitialised;
+                return subjectItr.hasNext();
             } catch (RepositoryException e) {
-                throw new IllegalArgumentException("Exceptions while reading "
-                        + "Statements after " + currentStd ,e);
+                throw new IllegalStateException("Exceptions while checking "
+                        + "for next subject" ,e);
             }
         }
 
         @Override
         public String next() {
-            if(nextInitialised || hasNext()){
-                final org.openrdf.model.URI subject = 
-                        (org.openrdf.model.URI)currentStd.getSubject();
+            URI subject = null;
+            try {
+                subject = subjectItr.next();
                 currentRep = vf.createRdfRepresentation(subject);
-                try {
                     createRepresentation(subject, currentRep.getModel());
-                } catch (RepositoryException e) {
-                    currentRep = null;
-                    throw new IllegalStateException("Unable to read statements 
"
-                        + "for Entity " + (currentStd != null ? 
currentStd.getSubject() :
-                            "") +"!",e);
-                }
-                nextInitialised = false;
                 return subject.toString();
-            } else {
+            } catch (RepositoryException e) {
                 currentRep = null;
-                throw new NoSuchElementException();
+                throw new IllegalStateException("Unable to read statements "
+                    + "for Entity " + (subject == null ? "unknown" : subject) 
+"!",e);
             }
         }
 
@@ -471,6 +465,8 @@ public class RdfIndexingSource extends A
          */
         protected void createRepresentation(org.openrdf.model.URI subject, 
final Model model)
                 throws RepositoryException {
+            RepositoryResult<Statement> stmts = connection.getStatements(
+                subject,null,null,includeInferred,contexts);
             final Set<BNode> bnodes;
             final Set<BNode> visited;
             if(followBNodeState){
@@ -480,11 +476,9 @@ public class RdfIndexingSource extends A
                 bnodes = null;
                 visited = null;
             }
-            boolean next = false;
-            while(!next && stdItr.hasNext()){
-                currentStd = stdItr.next();
-                next = !subject.equals(currentStd.getSubject());
-                if(!next){
+            try {
+                while(stmts.hasNext()){
+                    Statement currentStd = stmts.next();
                     model.add(currentStd);
                     if(followBNodeState){ //keep referenced BNodes
                         Value object = currentStd.getObject();
@@ -492,7 +486,9 @@ public class RdfIndexingSource extends A
                             bnodes.add((BNode)object);
                         }
                     } //else do not follow BNode values
-                } //else the subject has changed ... stop here
+                }
+            } finally {
+                stmts.close();
             }
             if(followBNodeState){ //process BNodes
                 for(BNode bnode : bnodes){
@@ -520,6 +516,9 @@ public class RdfIndexingSource extends A
         public void close() {
             entityDataIterators.remove(this);
             try {
+                subjectItr.close();
+            } catch (RepositoryException e) {/* ignore */}
+            try {
                 connection.close();
             } catch (RepositoryException e) { /* ignore */ }
         }
@@ -587,7 +586,7 @@ public class RdfIndexingSource extends A
     }
 
     @Override
-    public Literal createLiteral(String content, Locale language, URI type) {
+    public Literal createLiteral(String content, Locale language, java.net.URI 
type) {
         return createLiteralInternal(sesameFactory, content, language, type);
     }
 


Reply via email to