http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/VisibilitySimplifier.java
----------------------------------------------------------------------
diff --git 
a/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/VisibilitySimplifier.java
 
b/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/VisibilitySimplifier.java
new file mode 100644
index 0000000..a9d7468
--- /dev/null
+++ 
b/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/VisibilitySimplifier.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.model.visibility;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Charsets;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Simplifies Accumulo visibility expressions.
+ * 
+ * XXX
+ * This class has been copied over because Rya has decided to use the Accumulo
+ * implementation of visibilities to control who is able to access what data
+ * within a Rya instance. Until we implement an Accumulo agnostic method for
+ * handling those visibility expressions, we have chosen to pull the Accumulo
+ * code into our API.
+ *
+ * Copied from rya accumulo's 
org.apache.accumulo.core.util.VisibilitySimplifier
+ *   <dependancy>
+ *     <groupId>org.apache.rya.accumulo</groupId>
+ *     <artifactId>accumulo.rya</artifactId>
+ *     <version>3.2.12-incubating-SNAPSHOT</version>
+ *   </dependancy>
+ */
+@DefaultAnnotation(NonNull.class)
+public class VisibilitySimplifier {
+
+    /**
+     * Unions two visibility equations and then simplifies the result.
+     *
+     * @param vis1 - The first visibility equation that will be unioned. (not 
null)
+     * @param vis2 - The other visibility equation that will be unioned. (not 
null)
+     * @return A simplified form of the unioned visibility equations.
+     */
+    public static String unionAndSimplify(final String vis1, final String 
vis2) {
+        requireNonNull(vis1);
+        requireNonNull(vis2);
+
+        if(vis1.isEmpty()) {
+            return vis2;
+        }
+
+        if(vis2.isEmpty()) {
+            return vis1;
+        }
+
+        return simplify("(" + vis1 + ")&(" + vis2 + ")");
+    }
+
+    /**
+     * Simplifies an Accumulo visibility expression.
+     *
+     * @param visibility - The expression to simplify. (not null)
+     * @return A simplified form of {@code visibility}.
+     */
+    public static String simplify(final String visibility) {
+        requireNonNull(visibility);
+
+        String last = visibility;
+        String simplified = new String(new 
ColumnVisibility(visibility).flatten(), Charsets.UTF_8);
+
+        while(!simplified.equals(last)) {
+            last = simplified;
+            simplified = new String(new 
ColumnVisibility(simplified).flatten(), Charsets.UTF_8);
+        }
+
+        return simplified;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/WritableComparator.java
----------------------------------------------------------------------
diff --git 
a/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/WritableComparator.java
 
b/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/WritableComparator.java
new file mode 100644
index 0000000..121da20
--- /dev/null
+++ 
b/common/rya.api.model/src/main/java/org/apache/rya/api/model/visibility/WritableComparator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rya.api.model.visibility;
+
+/**
+ * A Comparator for {@link WritableComparable}s.
+ *
+ * <p>
+ * This base implemenation uses the natural ordering. To define alternate
+ * orderings, override {@link #compare(WritableComparable,WritableComparable)}.
+ *
+ * <p>
+ * One may optimize compare-intensive operations by overriding
+ * {@link #compare(byte[],int,int,byte[],int,int)}. Static utility methods are
+ * provided to assist in optimized implementations of this method.
+ *
+ * XXX
+ * This class has been copied over because Rya has decided to use the Accumulo
+ * implementation of visibilities to control who is able to access what data
+ * within a Rya instance. Until we implement an Accumulo agnostic method for
+ * handling those visibility expressions, we have chosen to pull the Accumulo
+ * code into our API.
+ *
+ * Copied from accumulo's org.apache.hadoop.io.WritableComparator
+ *   <dependancy>
+ *     <groupId>org.apache.hadoop</groupId>
+ *     <artifactId>hadoop-commons</artifactId>
+ *     <version>2.5</version>
+ *   </dependancy>
+ */
+public class WritableComparator {
+    /** Lexicographic order of binary data. */
+    public static int compareBytes(final byte[] b1, final int s1, final int 
l1, final byte[] b2, final int s2,
+            final int l2) {
+        return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/common/rya.api.model/src/test/java/org/apache/rya/api/model/visibility/VisibilitySimplifierTest.java
----------------------------------------------------------------------
diff --git 
a/common/rya.api.model/src/test/java/org/apache/rya/api/model/visibility/VisibilitySimplifierTest.java
 
b/common/rya.api.model/src/test/java/org/apache/rya/api/model/visibility/VisibilitySimplifierTest.java
new file mode 100644
index 0000000..30949ad
--- /dev/null
+++ 
b/common/rya.api.model/src/test/java/org/apache/rya/api/model/visibility/VisibilitySimplifierTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.model.visibility;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Tests the methods of {@link VisibilitySimplifier}.
+ * 
+ * XXX
+ * This class has been copied over because Rya has decided to use the Accumulo
+ * implementation of visibilities to control who is able to access what data
+ * within a Rya instance. Until we implement an Accumulo agnostic method for
+ * handling those visibility expressions, we have chosen to pull the Accumulo
+ * code into our API.
+ *
+ * Copied from accumulo's 
org.apache.rya.accumulo.utils.VisibilitySimplifierTest
+ *   <dependancy>
+ *     <groupId>org.apache.rya.accumulo</groupId>
+ *     <artifactId>accumulo.rya</artifactId>
+ *     <version>3.2.12-incubating-SNAPSHOT</version>
+ *   </dependancy>
+ */
+public class VisibilitySimplifierTest {
+
+    @Test
+    public void noneRequired() {
+        final String simplified = new VisibilitySimplifier().simplify("u");
+        assertEquals("u", simplified);
+    }
+
+    @Test
+    public void parenthesis() {
+        final String simplified = new 
VisibilitySimplifier().simplify("(u&u)&u");
+        assertEquals("u", simplified);
+    }
+
+    @Test
+    public void manyAnds() {
+        final String simplified = new VisibilitySimplifier().simplify("u&u&u");
+        assertEquals("u", simplified);
+    }
+
+    @Test
+    public void complex() {
+        final String simplified = new 
VisibilitySimplifier().simplify("(a|b)|(a|b)|a|b");
+        assertEquals("a|b", simplified);
+    }
+
+    @Test
+    public void unionAndSimplify() {
+        final String simplified = new 
VisibilitySimplifier().unionAndSimplify("u&b", "u");
+        assertEquals("b&u", simplified);
+    }
+
+    @Test
+    public void unionAndSimplify_firstIsEmpty() {
+        final String simplified = new 
VisibilitySimplifier().unionAndSimplify("", "u");
+        assertEquals("u", simplified);
+    }
+
+    @Test
+    public void unionAndSimplify_secondIsEmpty() {
+        final String simplified = new 
VisibilitySimplifier().unionAndSimplify("u", "");
+        assertEquals("u", simplified);
+    }
+
+    @Test
+    public void unionAndSimplify_bothAreEmpty() {
+        final String simplified = new 
VisibilitySimplifier().unionAndSimplify("", "");
+        assertEquals("", simplified);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index 4839c04..f2e8cf9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -59,6 +59,11 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api.function</artifactId>
+        </dependency>
 
         <!-- 3rd Party Runtime Dependencies. -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
index 2dc48f5..2d2bfa7 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
@@ -35,7 +35,10 @@ import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.function.join.LazyJoiningIterator.Side;
+import org.apache.rya.api.function.join.LeftOuterJoin;
+import org.apache.rya.api.function.join.NaturalJoin;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import 
org.apache.rya.indexing.pcj.fluo.app.batch.AbstractBatchBindingSetUpdater;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
@@ -47,9 +50,6 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -90,21 +90,21 @@ public class JoinResultUpdater extends AbstractNodeUpdater {
 
         log.trace(
                 "Transaction ID: " + tx.getStartTimestamp() + "\n" +
-                "Join Node ID: " + joinMetadata.getNodeId() + "\n" +
-                "Child Node ID: " + childNodeId + "\n" +
-                "Child Binding Set:\n" + childBindingSet + "\n");
+                        "Join Node ID: " + joinMetadata.getNodeId() + "\n" +
+                        "Child Node ID: " + childNodeId + "\n" +
+                        "Child Binding Set:\n" + childBindingSet + "\n");
 
         // Figure out which join algorithm we are going to use.
         final IterativeJoin joinAlgorithm;
         switch(joinMetadata.getJoinType()) {
-        case NATURAL_JOIN:
-            joinAlgorithm = new NaturalJoin();
-            break;
-        case LEFT_OUTER_JOIN:
-            joinAlgorithm = new LeftOuterJoin();
-            break;
-        default:
-            throw new RuntimeException("Unsupported JoinType: " + 
joinMetadata.getJoinType());
+            case NATURAL_JOIN:
+                joinAlgorithm = new NaturalJoin();
+                break;
+            case LEFT_OUTER_JOIN:
+                joinAlgorithm = new LeftOuterJoin();
+                break;
+            default:
+                throw new RuntimeException("Unsupported JoinType: " + 
joinMetadata.getJoinType());
         }
 
         // Figure out which side of the join the new binding set appeared on.
@@ -120,10 +120,10 @@ public class JoinResultUpdater extends 
AbstractNodeUpdater {
         }
 
         // Iterates over the sibling node's BindingSets that join with the new 
binding set.
-        Set<VisibilityBindingSet> siblingBindingSets = new HashSet<>();
-        Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, 
siblingId);
-        Column siblingColumn = getScanColumnFamily(siblingId);
-        Optional<RowColumn> rowColumn = fillSiblingBatch(tx, siblingSpan, 
siblingColumn, siblingBindingSets, joinMetadata.getJoinBatchSize());
+        final Set<VisibilityBindingSet> siblingBindingSets = new HashSet<>();
+        final Span siblingSpan = getSpan(tx, childNodeId, childBindingSet, 
siblingId);
+        final Column siblingColumn = getScanColumnFamily(siblingId);
+        final Optional<RowColumn> rowColumn = fillSiblingBatch(tx, 
siblingSpan, siblingColumn, siblingBindingSets, 
joinMetadata.getJoinBatchSize());
 
         // Iterates over the resulting BindingSets from the join.
         final Iterator<VisibilityBindingSet> newJoinResults;
@@ -148,7 +148,7 @@ public class JoinResultUpdater extends AbstractNodeUpdater {
 
                 log.trace(
                         "Transaction ID: " + tx.getStartTimestamp() + "\n" +
-                        "New Join Result:\n" + newJoinResult + "\n");
+                                "New Join Result:\n" + newJoinResult + "\n");
 
                 tx.set(resultRow, FluoQueryColumns.JOIN_BINDING_SET, 
nodeValueBytes);
             }
@@ -157,29 +157,21 @@ public class JoinResultUpdater extends 
AbstractNodeUpdater {
         // if batch limit met, there are additional entries to process
         // update the span and register updated batch job
         if (rowColumn.isPresent()) {
-            Span newSpan = 
AbstractBatchBindingSetUpdater.getNewSpan(rowColumn.get(), siblingSpan);
-            JoinBatchInformation joinBatch = JoinBatchInformation.builder()
-                .setBatchSize(joinMetadata.getJoinBatchSize())
-                .setBs(childBindingSet)
-                .setColumn(siblingColumn)
-                .setJoinType(joinMetadata.getJoinType())
-                .setSide(emittingSide)
-                .setSpan(newSpan)
-                .setTask(Task.Add)
-                .build();
+            final Span newSpan = 
AbstractBatchBindingSetUpdater.getNewSpan(rowColumn.get(), siblingSpan);
+            final JoinBatchInformation joinBatch = 
JoinBatchInformation.builder()
+                    .setBatchSize(joinMetadata.getJoinBatchSize())
+                    .setBs(childBindingSet)
+                    .setColumn(siblingColumn)
+                    .setJoinType(joinMetadata.getJoinType())
+                    .setSide(emittingSide)
+                    .setSpan(newSpan)
+                    .setTask(Task.Add)
+                    .build();
             BatchInformationDAO.addBatch(tx, joinMetadata.getNodeId(), 
joinBatch);
         }
     }
 
     /**
-     * The different sides a new binding set may appear on.
-     */
-    public static enum Side {
-        LEFT, RIGHT;
-    }
-
-
-    /**
      * Fetches batch to be processed by scanning over the Span specified by the
      * {@link JoinBatchInformation}. The number of results is less than or 
equal
      * to the batch size specified by the JoinBatchInformation.
@@ -190,17 +182,17 @@ public class JoinResultUpdater extends 
AbstractNodeUpdater {
      * @return Set - containing results of sibling scan.
      * @throws Exception
      */
-    private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, Span 
siblingSpan, Column siblingColumn, Set<VisibilityBindingSet> bsSet, int 
batchSize) throws Exception {
+    private Optional<RowColumn> fillSiblingBatch(final TransactionBase tx, 
final Span siblingSpan, final Column siblingColumn, final 
Set<VisibilityBindingSet> bsSet, final int batchSize) throws Exception {
 
-        RowScanner rs = 
tx.scanner().over(siblingSpan).fetch(siblingColumn).byRow().build();
-        Iterator<ColumnScanner> colScannerIter = rs.iterator();
+        final RowScanner rs = 
tx.scanner().over(siblingSpan).fetch(siblingColumn).byRow().build();
+        final Iterator<ColumnScanner> colScannerIter = rs.iterator();
 
         boolean batchLimitMet = false;
         Bytes row = siblingSpan.getStart().getRow();
         while (colScannerIter.hasNext() && !batchLimitMet) {
-            ColumnScanner colScanner = colScannerIter.next();
+            final ColumnScanner colScanner = colScannerIter.next();
             row = colScanner.getRow();
-            Iterator<ColumnValue> iter = colScanner.iterator();
+            final Iterator<ColumnValue> iter = colScanner.iterator();
             while (iter.hasNext() && !batchLimitMet) {
                 bsSet.add(BS_SERDE.deserialize(iter.next().getValue()));
                 //check if batch size has been met and set flag if it has been 
met
@@ -271,7 +263,7 @@ public class JoinResultUpdater extends AbstractNodeUpdater {
     private VariableOrder removeBinIdFromVarOrder(VariableOrder varOrder) {
         List<String> varOrderList = varOrder.getVariableOrders();
         
if(varOrderList.get(0).equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
-            List<String> updatedVarOrderList = 
Lists.newArrayList(varOrderList);
+            final List<String> updatedVarOrderList = 
Lists.newArrayList(varOrderList);
             updatedVarOrderList.remove(0);
             return new VariableOrder(updatedVarOrderList);
         } else {
@@ -350,168 +342,4 @@ public class JoinResultUpdater extends 
AbstractNodeUpdater {
 
         return column;
     }
-
-    /**
-     * Defines each of the cases that may generate new join results when
-     * iteratively computing a query's join node.
-     */
-    public static interface IterativeJoin {
-
-        /**
-         * Invoked when a new {@link VisibilityBindingSet} is emitted from the 
left child
-         * node of the join. The Fluo table is scanned for results on the right
-         * side that will be joined with the new result.
-         *
-         * @param newLeftResult - A new VisibilityBindingSet that has been 
emitted from
-         *   the left child node.
-         * @param rightResults - The right child node's binding sets that will
-         *   be joined with the new left result. (not null)
-         * @return The new BindingSet results for the join.
-         */
-        public Iterator<VisibilityBindingSet> 
newLeftResult(VisibilityBindingSet newLeftResult, 
Iterator<VisibilityBindingSet> rightResults);
-
-        /**
-         * Invoked when a new {@link VisibilityBindingSet} is emitted from the 
right child
-         * node of the join. The Fluo table is scanned for results on the left
-         * side that will be joined with the new result.
-         *
-         * @param leftResults - The left child node's binding sets that will be
-         *   joined with the new right result.
-         * @param newRightResult - A new BindingSet that has been emitted from
-         *   the right child node.
-         * @return The new BindingSet results for the join.
-         */
-        public Iterator<VisibilityBindingSet> 
newRightResult(Iterator<VisibilityBindingSet> leftResults, VisibilityBindingSet 
newRightResult);
-    }
-
-    /**
-     * Implements an {@link IterativeJoin} that uses the Natural Join algorithm
-     * defined by Relational Algebra.
-     * <p>
-     * This is how you combine {@code BindnigSet}s that may have common Binding
-     * names. When two Binding Sets are joined, any bindings that appear in 
both
-     * binding sets are only included once.
-     */
-    public static final class NaturalJoin implements IterativeJoin {
-        @Override
-        public Iterator<VisibilityBindingSet> newLeftResult(final 
VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> 
rightResults) {
-            checkNotNull(newLeftResult);
-            checkNotNull(rightResults);
-
-            // Both sides are required, so if there are no right results, then 
do not emit anything.
-            return new LazyJoiningIterator(Side.LEFT, newLeftResult, 
rightResults);
-        }
-
-        @Override
-        public Iterator<VisibilityBindingSet> newRightResult(final 
Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet 
newRightResult) {
-            checkNotNull(leftResults);
-            checkNotNull(newRightResult);
-
-            // Both sides are required, so if there are no left reuslts, then 
do not emit anything.
-            return new LazyJoiningIterator(Side.RIGHT, newRightResult, 
leftResults);
-        }
-    }
-
-    /**
-     * Implements an {@link IterativeJoin} that uses the Left Outer Join
-     * algorithm defined by Relational Algebra.
-     * <p>
-     * This is how you add optional information to a {@link BindingSet}. Left
-     * binding sets are emitted even if they do not join with anything on the 
right.
-     * However, right binding sets must be joined with a left binding set.
-     */
-    public static final class LeftOuterJoin implements IterativeJoin {
-        @Override
-        public Iterator<VisibilityBindingSet> newLeftResult(final 
VisibilityBindingSet newLeftResult, final Iterator<VisibilityBindingSet> 
rightResults) {
-            checkNotNull(newLeftResult);
-            checkNotNull(rightResults);
-
-            // If the required portion does not join with any optional 
portions,
-            // then emit a BindingSet that matches the new left result.
-            if(!rightResults.hasNext()) {
-                return 
Lists.<VisibilityBindingSet>newArrayList(newLeftResult).iterator();
-            }
-
-            // Otherwise, return an iterator that holds the new required result
-            // joined with the right results.
-            return new LazyJoiningIterator(Side.LEFT, newLeftResult, 
rightResults);
-        }
-
-        @Override
-        public Iterator<VisibilityBindingSet> newRightResult(final 
Iterator<VisibilityBindingSet> leftResults, final VisibilityBindingSet 
newRightResult) {
-            checkNotNull(leftResults);
-            checkNotNull(newRightResult);
-
-            // The right result is optional, so if it does not join with 
anything
-            // on the left, then do not emit anything.
-            return new LazyJoiningIterator(Side.RIGHT, newRightResult, 
leftResults);
-        }
-    }
-
-    /**
-     * Joins a {@link BindingSet} (which is new to the left or right side of a 
join)
-     * to all binding sets on the other side that join with it.
-     * <p>
-     * This is done lazily so that you don't have to load all of the 
BindingSets
-     * into memory at once.
-     */
-    private static final class LazyJoiningIterator implements 
Iterator<VisibilityBindingSet> {
-
-        private final Side newResultSide;
-        private final VisibilityBindingSet newResult;
-        private final Iterator<VisibilityBindingSet> joinedResults;
-
-        /**
-         * Constructs an instance of {@link LazyJoiningIterator}.
-         *
-         * @param newResultSide - Indicates which side of the join the {@code 
newResult} arrived on. (not null)
-         * @param newResult - A binding set that will be joined with some 
other binding sets. (not null)
-         * @param joinedResults - The binding sets that will be joined with 
{@code newResult}. (not null)
-         */
-        public LazyJoiningIterator(final Side newResultSide, final 
VisibilityBindingSet newResult, final Iterator<VisibilityBindingSet> 
joinedResults) {
-            this.newResultSide = checkNotNull(newResultSide);
-            this.newResult = checkNotNull(newResult);
-            this.joinedResults = checkNotNull(joinedResults);
-        }
-
-        @Override
-        public boolean hasNext() {
-            return joinedResults.hasNext();
-        }
-
-        @Override
-        public VisibilityBindingSet next() {
-            final MapBindingSet bs = new MapBindingSet();
-
-            for(final Binding binding : newResult) {
-                bs.addBinding(binding);
-            }
-
-            final VisibilityBindingSet joinResult = joinedResults.next();
-            for(final Binding binding : joinResult) {
-                bs.addBinding(binding);
-            }
-
-            // We want to make sure the visibilities are always written the 
same way,
-            // so figure out which are on the left side and which are on the 
right side.
-            final String leftVisi;
-            final String rightVisi;
-            if(newResultSide == Side.LEFT) {
-                leftVisi = newResult.getVisibility();
-                rightVisi = joinResult.getVisibility();
-            } else {
-                leftVisi = joinResult.getVisibility();
-                rightVisi = newResult.getVisibility();
-            }
-            final String visibility = 
VisibilitySimplifier.unionAndSimplify(leftVisi, rightVisi);
-
-            return new VisibilityBindingSet(bs, visibility);
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("remove() is 
unsupported.");
-        }
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
index 0d46e19..98f35cb 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
@@ -31,11 +31,11 @@ import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.log4j.Logger;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.function.join.LazyJoiningIterator.Side;
+import org.apache.rya.api.function.join.LeftOuterJoin;
+import org.apache.rya.api.function.join.NaturalJoin;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
@@ -66,12 +66,12 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
      * @throws Exception
      */
     @Override
-    public void processBatch(TransactionBase tx, Bytes row, BatchInformation 
batch) throws Exception {
+    public void processBatch(final TransactionBase tx, final Bytes row, final 
BatchInformation batch) throws Exception {
         super.processBatch(tx, row, batch);
-        String nodeId = BatchRowKeyUtil.getNodeId(row);
+        final String nodeId = BatchRowKeyUtil.getNodeId(row);
         Preconditions.checkArgument(batch instanceof JoinBatchInformation);
-        JoinBatchInformation joinBatch = (JoinBatchInformation) batch;
-        Task task = joinBatch.getTask();
+        final JoinBatchInformation joinBatch = (JoinBatchInformation) batch;
+        final Task task = joinBatch.getTask();
 
         // Figure out which join algorithm we are going to use.
         final IterativeJoin joinAlgorithm;
@@ -86,12 +86,12 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
             throw new RuntimeException("Unsupported JoinType: " + 
joinBatch.getJoinType());
         }
 
-        Set<VisibilityBindingSet> bsSet = new HashSet<>();
-        Optional<RowColumn> rowCol = fillSiblingBatch(tx, joinBatch, bsSet);
+        final Set<VisibilityBindingSet> bsSet = new HashSet<>();
+        final Optional<RowColumn> rowCol = fillSiblingBatch(tx, joinBatch, 
bsSet);
 
         // Iterates over the resulting BindingSets from the join.
         final Iterator<VisibilityBindingSet> newJoinResults;
-        VisibilityBindingSet bs = joinBatch.getBs();
+        final VisibilityBindingSet bs = joinBatch.getBs();
         if (joinBatch.getSide() == Side.LEFT) {
             newJoinResults = joinAlgorithm.newLeftResult(bs, bsSet.iterator());
         } else {
@@ -104,7 +104,7 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
         while (newJoinResults.hasNext()) {
             final VisibilityBindingSet newJoinResult = newJoinResults.next();
             //create BindingSet value
-            Bytes bsBytes = BS_SERDE.serialize(newJoinResult);
+            final Bytes bsBytes = BS_SERDE.serialize(newJoinResult);
             //make rowId
             Bytes rowKey = BindingHashShardingFunction.addShard(nodeId, 
joinVarOrder, newJoinResult);
             final Column col = FluoQueryColumns.JOIN_BINDING_SET;
@@ -114,14 +114,14 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
         // if batch limit met, there are additional entries to process
         // update the span and register updated batch job
         if (rowCol.isPresent()) {
-            Span newSpan = getNewSpan(rowCol.get(), joinBatch.getSpan());
+            final Span newSpan = getNewSpan(rowCol.get(), joinBatch.getSpan());
             joinBatch.setSpan(newSpan);
             BatchInformationDAO.addBatch(tx, nodeId, joinBatch);
         }
 
     }
 
-    private void processTask(TransactionBase tx, Task task, Bytes row, Column 
column, Bytes value) {
+    private void processTask(final TransactionBase tx, final Task task, final 
Bytes row, final Column column, final Bytes value) {
         switch (task) {
         case Add:
             tx.set(row, column, value);
@@ -149,21 +149,21 @@ public class JoinBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
      * @return Set - containing results of sibling scan.
      * @throws Exception
      */
-    private Optional<RowColumn> fillSiblingBatch(TransactionBase tx, 
JoinBatchInformation batch, Set<VisibilityBindingSet> bsSet) throws Exception {
+    private Optional<RowColumn> fillSiblingBatch(final TransactionBase tx, 
final JoinBatchInformation batch, final Set<VisibilityBindingSet> bsSet) throws 
Exception {
 
-        Span span = batch.getSpan();
-        Column column = batch.getColumn();
-        int batchSize = batch.getBatchSize();
+        final Span span = batch.getSpan();
+        final Column column = batch.getColumn();
+        final int batchSize = batch.getBatchSize();
 
-        RowScanner rs = tx.scanner().over(span).fetch(column).byRow().build();
-        Iterator<ColumnScanner> colScannerIter = rs.iterator();
+        final RowScanner rs = 
tx.scanner().over(span).fetch(column).byRow().build();
+        final Iterator<ColumnScanner> colScannerIter = rs.iterator();
 
         boolean batchLimitMet = false;
         Bytes row = span.getStart().getRow();
         while (colScannerIter.hasNext() && !batchLimitMet) {
-            ColumnScanner colScanner = colScannerIter.next();
+            final ColumnScanner colScanner = colScannerIter.next();
             row = colScanner.getRow();
-            Iterator<ColumnValue> iter = colScanner.iterator();
+            final Iterator<ColumnValue> iter = colScanner.iterator();
             while (iter.hasNext()) {
                 if (bsSet.size() >= batchSize) {
                     batchLimitMet = true;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
index d46cccd..ace9e76 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -21,8 +21,8 @@ import java.util.Objects;
 
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
+import org.apache.rya.api.function.join.LazyJoiningIterator.Side;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import org.openrdf.query.Binding;
 
@@ -49,9 +49,9 @@ import org.openrdf.query.Binding;
 public class JoinBatchInformation extends AbstractSpanBatchInformation {
 
     private static final BatchBindingSetUpdater updater = new 
JoinBatchBindingSetUpdater();
-    private VisibilityBindingSet bs; //update for join child indicated by side
-    private Side side;  //join child that was updated by bs
-    private JoinType join;
+    private final VisibilityBindingSet bs; //update for join child indicated 
by side
+    private final Side side;  //join child that was updated by bs
+    private final JoinType join;
     /**
      * @param batchSize - batch size that Tasks are performed in
      * @param task - Add, Delete, or Update
@@ -61,14 +61,14 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
      * @param side - The side of the child that the VisibilityBindingSet 
update occurred at
      * @param join - JoinType (left, right, natural inner)
      */
-    public JoinBatchInformation(int batchSize, Task task, Column column, Span 
span, VisibilityBindingSet bs, Side side, JoinType join) {
+    public JoinBatchInformation(final int batchSize, final Task task, final 
Column column, final Span span, final VisibilityBindingSet bs, final Side side, 
final JoinType join) {
         super(batchSize, task, column, span);
         this.bs = Objects.requireNonNull(bs);
         this.side = Objects.requireNonNull(side);
         this.join = Objects.requireNonNull(join);
     }
 
-    public JoinBatchInformation(Task task, Column column, Span span, 
VisibilityBindingSet bs, Side side, JoinType join) {
+    public JoinBatchInformation(final Task task, final Column column, final 
Span span, final VisibilityBindingSet bs, final Side side, final JoinType join) 
{
         this(DEFAULT_BATCH_SIZE, task, column, span, bs, side, join);
     }
 
@@ -123,7 +123,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
     }
 
     @Override
-    public boolean equals(Object other) {
+    public boolean equals(final Object other) {
         if (this == other) {
             return true;
         }
@@ -132,9 +132,9 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
             return false;
         }
 
-        JoinBatchInformation batch = (JoinBatchInformation) other;
-        return super.equals(other) &&  Objects.equals(this.bs, batch.bs) && 
Objects.equals(this.join, batch.join)
-                && Objects.equals(this.side, batch.side);
+        final JoinBatchInformation batch = (JoinBatchInformation) other;
+        return super.equals(other) &&  Objects.equals(bs, batch.bs) && 
Objects.equals(join, batch.join)
+                && Objects.equals(side, batch.side);
     }
 
     @Override
@@ -160,7 +160,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
         /**
          * @param batchSize - batch size that {@link Task}s are performed in
          */
-        public Builder setBatchSize(int batchSize) {
+        public Builder setBatchSize(final int batchSize) {
             this.batchSize = batchSize;
             return this;
         }
@@ -168,7 +168,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
         /**
          * @param task - Task performed (Add, Delete, Update)
          */
-        public Builder setTask(Task task) {
+        public Builder setTask(final Task task) {
             this.task = task;
             return this;
         }
@@ -176,7 +176,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
         /**
          * @param column - Column of join child to be scanned
          */
-        public Builder setColumn(Column column) {
+        public Builder setColumn(final Column column) {
             this.column = column;
             return this;
         }
@@ -188,7 +188,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
          * the common variables of the left and right join children.
          * @param span - Span over join child to be scanned
          */
-        public Builder setSpan(Span span) {
+        public Builder setSpan(final Span span) {
             this.span = span;
             return this;
         }
@@ -198,7 +198,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
          * by Side.
          * @param bs - BindingSet update of join child to be joined with 
results of scan
          */
-        public Builder setBs(VisibilityBindingSet bs) {
+        public Builder setBs(final VisibilityBindingSet bs) {
             this.bs = bs;
             return this;
         }
@@ -206,7 +206,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
         /**
          * @param join - JoinType (left, right, natural inner)
          */
-        public Builder setJoinType(JoinType join) {
+        public Builder setJoinType(final JoinType join) {
             this.join = join;
             return this;
         }
@@ -215,7 +215,7 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
          * Indicates the join child corresponding to the VisibilityBindingSet 
update
          * @param side - side of join the child BindingSet update appeared at
          */
-        public Builder setSide(Side side) {
+        public Builder setSide(final Side side) {
             this.side = side;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
index 0687a3e..1e7eba0 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
@@ -22,8 +22,8 @@ import java.lang.reflect.Type;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
+import org.apache.rya.api.function.join.LazyJoiningIterator.Side;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
 import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
@@ -49,42 +49,42 @@ public class JoinBatchInformationTypeAdapter implements 
JsonSerializer<JoinBatch
     private static final VisibilityBindingSetStringConverter converter = new 
VisibilityBindingSetStringConverter();
 
     @Override
-    public JsonElement serialize(JoinBatchInformation batch, Type typeOfSrc, 
JsonSerializationContext context) {
-        JsonObject result = new JsonObject();
+    public JsonElement serialize(final JoinBatchInformation batch, final Type 
typeOfSrc, final JsonSerializationContext context) {
+        final JsonObject result = new JsonObject();
         result.add("class", new JsonPrimitive(batch.getClass().getName()));
         result.add("batchSize", new JsonPrimitive(batch.getBatchSize()));
         result.add("task", new JsonPrimitive(batch.getTask().name()));
-        Column column = batch.getColumn();
+        final Column column = batch.getColumn();
         result.add("column", new JsonPrimitive(column.getsFamily() + "\u0000" 
+ column.getsQualifier()));
-        Span span = batch.getSpan();
+        final Span span = batch.getSpan();
         result.add("span", new JsonPrimitive(span.getStart().getsRow() + 
"\u0000" + span.getEnd().getsRow()));
         result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
         result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
         result.add("side", new JsonPrimitive(batch.getSide().name()));
         result.add("joinType", new JsonPrimitive(batch.getJoinType().name()));
-        String updateVarOrderString = 
Joiner.on(";").join(batch.getBs().getBindingNames());
-        VariableOrder updateVarOrder = new VariableOrder(updateVarOrderString);
+        final String updateVarOrderString = 
Joiner.on(";").join(batch.getBs().getBindingNames());
+        final VariableOrder updateVarOrder = new 
VariableOrder(updateVarOrderString);
         result.add("bindingSet", new 
JsonPrimitive(converter.convert(batch.getBs(), updateVarOrder)));
         result.add("updateVarOrder", new JsonPrimitive(updateVarOrderString));
         return result;
     }
 
     @Override
-    public JoinBatchInformation deserialize(JsonElement element, Type typeOfT, 
JsonDeserializationContext context)
+    public JoinBatchInformation deserialize(final JsonElement element, final 
Type typeOfT, final JsonDeserializationContext context)
             throws JsonParseException {
-        JsonObject json = element.getAsJsonObject();
-        int batchSize = json.get("batchSize").getAsInt();
-        Task task = Task.valueOf(json.get("task").getAsString());
-        String[] colArray = json.get("column").getAsString().split("\u0000");
-        Column column = new Column(colArray[0], colArray[1]);
-        String[] rows = json.get("span").getAsString().split("\u0000");
-        boolean startInc = json.get("startInc").getAsBoolean();
-        boolean endInc = json.get("endInc").getAsBoolean();
-        Span span = new Span(new RowColumn(rows[0]), startInc, new 
RowColumn(rows[1]), endInc);
-        VariableOrder updateVarOrder = new 
VariableOrder(json.get("updateVarOrder").getAsString());
-        VisibilityBindingSet bs = 
converter.convert(json.get("bindingSet").getAsString(), updateVarOrder);
-        Side side = Side.valueOf(json.get("side").getAsString());
-        JoinType join = JoinType.valueOf(json.get("joinType").getAsString());
+        final JsonObject json = element.getAsJsonObject();
+        final int batchSize = json.get("batchSize").getAsInt();
+        final Task task = Task.valueOf(json.get("task").getAsString());
+        final String[] colArray = 
json.get("column").getAsString().split("\u0000");
+        final Column column = new Column(colArray[0], colArray[1]);
+        final String[] rows = json.get("span").getAsString().split("\u0000");
+        final boolean startInc = json.get("startInc").getAsBoolean();
+        final boolean endInc = json.get("endInc").getAsBoolean();
+        final Span span = new Span(new RowColumn(rows[0]), startInc, new 
RowColumn(rows[1]), endInc);
+        final VariableOrder updateVarOrder = new 
VariableOrder(json.get("updateVarOrder").getAsString());
+        final VisibilityBindingSet bs = 
converter.convert(json.get("bindingSet").getAsString(), updateVarOrder);
+        final Side side = Side.valueOf(json.get("side").getAsString());
+        final JoinType join = 
JoinType.valueOf(json.get("joinType").getAsString());
         return 
JoinBatchInformation.builder().setBatchSize(batchSize).setTask(task).setSpan(span).setColumn(column).setBs(bs)
                .setSide(side).setJoinType(join).build();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
index 7cfa73c..c58a27b 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
@@ -49,11 +49,11 @@ public class ExporterManager implements AutoCloseable {
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
     private static final RyaSubGraphKafkaSerDe SG_SERDE = new 
RyaSubGraphKafkaSerDe();
-    private Map<String, String> simplifiedVisibilities = new HashMap<>();
+    private final Map<String, String> simplifiedVisibilities = new HashMap<>();
     
-    private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> 
exporters;
+    private final Map<QueryType, Map<ExportStrategy, 
IncrementalResultExporter>> exporters;
     
-    private ExporterManager(Map<QueryType, Map<ExportStrategy, 
IncrementalResultExporter>> exporters) {
+    private ExporterManager(final Map<QueryType, Map<ExportStrategy, 
IncrementalResultExporter>> exporters) {
         this.exporters = Preconditions.checkNotNull(exporters);
     }
     
@@ -73,9 +73,9 @@ public class ExporterManager implements AutoCloseable {
      * @param data - Serialized result to be exported
      * @throws ResultExportException 
      */
-    public void export(QueryType type, Set<ExportStrategy> strategies, String 
queryId, Bytes data) throws ResultExportException {
+    public void export(final QueryType type, final Set<ExportStrategy> 
strategies, final String queryId, final Bytes data) throws 
ResultExportException {
         
-        String pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
+        final String pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
         
         if(type == QueryType.CONSTRUCT) {
             exportSubGraph(exporters.get(type), strategies, pcjId, data);
@@ -93,21 +93,21 @@ public class ExporterManager implements AutoCloseable {
      * @param data - serialized BindingSet result
      * @throws ResultExportException
      */
-    private void exportBindingSet(Map<ExportStrategy, 
IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String 
pcjId, Bytes data) throws ResultExportException {
+    private void exportBindingSet(final Map<ExportStrategy, 
IncrementalResultExporter> exporters, final Set<ExportStrategy> strategies, 
final String pcjId, final Bytes data) throws ResultExportException {
         VisibilityBindingSet bs;
         try {
             bs = BS_SERDE.deserialize(data);
             simplifyVisibilities(bs);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new ResultExportException("Unable to deserialize the given 
BindingSet.", e);
         }
             
         try{
-            for(ExportStrategy strategy: strategies) {
-                IncrementalBindingSetExporter exporter = 
(IncrementalBindingSetExporter) exporters.get(strategy);
+            for(final ExportStrategy strategy: strategies) {
+                final IncrementalBindingSetExporter exporter = 
(IncrementalBindingSetExporter) exporters.get(strategy);
                 exporter.export(pcjId, bs);
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new ResultExportException("Unable to export the given 
BindingSet " + bs + " with the given set of ExportStrategies " + strategies, e);
         }
     }
@@ -120,27 +120,27 @@ public class ExporterManager implements AutoCloseable {
      * @param data - serialized RyaSubGraph result
      * @throws ResultExportException
      */
-    private void exportSubGraph(Map<ExportStrategy, IncrementalResultExporter> 
exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws 
ResultExportException {
-        RyaSubGraph subGraph = SG_SERDE.fromBytes(data.toArray());
+    private void exportSubGraph(final Map<ExportStrategy, 
IncrementalResultExporter> exporters, final Set<ExportStrategy> strategies, 
final String pcjId, final Bytes data) throws ResultExportException {
+        final RyaSubGraph subGraph = SG_SERDE.fromBytes(data.toArray());
         
         try {
             simplifyVisibilities(subGraph);
-        } catch (UnsupportedEncodingException e) {
+        } catch (final UnsupportedEncodingException e) {
             throw new ResultExportException("Undable to deserialize provided 
RyaSubgraph", e);
         }
         
         try {
-            for (ExportStrategy strategy : strategies) {
-                IncrementalRyaSubGraphExporter exporter = 
(IncrementalRyaSubGraphExporter) exporters.get(strategy);
+            for (final ExportStrategy strategy : strategies) {
+                final IncrementalRyaSubGraphExporter exporter = 
(IncrementalRyaSubGraphExporter) exporters.get(strategy);
                 exporter.export(pcjId, subGraph);
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new ResultExportException(
                     "Unable to export the given subgraph " + subGraph + " 
using all of the ExportStrategies " + strategies);
         }
     }
     
-    private void simplifyVisibilities(VisibilityBindingSet result) {
+    private void simplifyVisibilities(final VisibilityBindingSet result) {
         // Simplify the result's visibilities.
         final String visibility = result.getVisibility();
         if(!simplifiedVisibilities.containsKey(visibility)) {
@@ -150,19 +150,19 @@ public class ExporterManager implements AutoCloseable {
         result.setVisibility( simplifiedVisibilities.get(visibility) );
     }
     
-    private void simplifyVisibilities(RyaSubGraph subgraph) throws 
UnsupportedEncodingException {
-        Set<RyaStatement> statements = subgraph.getStatements();
+    private void simplifyVisibilities(final RyaSubGraph subgraph) throws 
UnsupportedEncodingException {
+        final Set<RyaStatement> statements = subgraph.getStatements();
         if (statements.size() > 0) {
-            byte[] visibilityBytes = 
statements.iterator().next().getColumnVisibility();
+            final byte[] visibilityBytes = 
statements.iterator().next().getColumnVisibility();
             // Simplify the result's visibilities and cache new simplified
             // visibilities
-            String visibility = new String(visibilityBytes, "UTF-8");
+            final String visibility = new String(visibilityBytes, "UTF-8");
             if (!simplifiedVisibilities.containsKey(visibility)) {
-                String simplified = VisibilitySimplifier.simplify(visibility);
+                final String simplified = 
VisibilitySimplifier.simplify(visibility);
                 simplifiedVisibilities.put(visibility, simplified);
             }
 
-            for (RyaStatement statement : statements) {
+            for (final RyaStatement statement : statements) {
                 
statement.setColumnVisibility(simplifiedVisibilities.get(visibility).getBytes("UTF-8"));
             }
             
@@ -172,25 +172,25 @@ public class ExporterManager implements AutoCloseable {
     
     public static class Builder {
         
-        private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> 
exporters = new HashMap<>();
+        private final Map<QueryType, Map<ExportStrategy, 
IncrementalResultExporter>> exporters = new HashMap<>();
         
         /**
          * Add an {@link IncrementalResultExporter} to be used by this 
ExporterManager for exporting results
          * @param exporter - IncrementalResultExporter for exporting query 
results
          * @return - Builder for chaining method calls
          */
-        public Builder addIncrementalResultExporter(IncrementalResultExporter 
exporter) {
+        public Builder addIncrementalResultExporter(final 
IncrementalResultExporter exporter) {
             
-            Set<QueryType> types = exporter.getQueryTypes();
-            ExportStrategy strategy = exporter.getExportStrategy();
+            final Set<QueryType> types = exporter.getQueryTypes();
+            final ExportStrategy strategy = exporter.getExportStrategy();
             
-            for (QueryType type : types) {
+            for (final QueryType type : types) {
                 if (!exporters.containsKey(type)) {
-                    Map<ExportStrategy, IncrementalResultExporter> exportMap = 
new HashMap<>();
+                    final Map<ExportStrategy, IncrementalResultExporter> 
exportMap = new HashMap<>();
                     exportMap.put(strategy, exporter);
                     exporters.put(type, exportMap);
                 } else {
-                    Map<ExportStrategy, IncrementalResultExporter> exportMap = 
exporters.get(type);
+                    final Map<ExportStrategy, IncrementalResultExporter> 
exportMap = exporters.get(type);
                     if (!exportMap.containsKey(strategy)) {
                         exportMap.put(strategy, exporter);
                     }
@@ -212,10 +212,10 @@ public class ExporterManager implements AutoCloseable {
     @Override
     public void close() throws Exception {
         
-        Collection<Map<ExportStrategy, IncrementalResultExporter>> values = 
exporters.values();
+        final Collection<Map<ExportStrategy, IncrementalResultExporter>> 
values = exporters.values();
         
-        for(Map<ExportStrategy, IncrementalResultExporter> map: values) {
-            for(IncrementalResultExporter exporter: map.values()) {
+        for(final Map<ExportStrategy, IncrementalResultExporter> map: values) {
+            for(final IncrementalResultExporter exporter: map.values()) {
                 exporter.close();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/IterativeJoinTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/IterativeJoinTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/IterativeJoinTest.java
deleted file mode 100644
index e1324c7..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/IterativeJoinTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.impl.MapBindingSet;
-
-/**
- * Tests the methods of {@link IterativeJoin}.
- */
-@RunWith(Parameterized.class)
-public class IterativeJoinTest {
-
-    @Parameters
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-            {new NaturalJoin()},
-            {new LeftOuterJoin()}
-           });
-    }
-
-    @Parameter
-    public IterativeJoin join;
-
-    /**
-     * This test ensures the same binding sets are created as the result of a
-     * {@link IterativeJoin} regardless of which side the notification is 
triggered on.
-     */
-    @Test
-    public void naturalJoin_sideDoesNotMatter() {
-        // Create the binding sets that will be joined.
-        final ValueFactory vf = new ValueFactoryImpl();
-
-        final MapBindingSet bs1 = new MapBindingSet();
-        bs1.addBinding("id", vf.createLiteral("some_uid"));
-        bs1.addBinding("name", vf.createLiteral("Alice"));
-        final VisibilityBindingSet vbs1 = new VisibilityBindingSet(bs1, "a");
-
-        final MapBindingSet bs2 = new MapBindingSet();
-        bs2.addBinding("id", vf.createLiteral("some_uid"));
-        bs2.addBinding("hair", vf.createLiteral("brown"));
-        final VisibilityBindingSet vbs2 = new VisibilityBindingSet(bs2, "b");
-
-        // new vbs1 shows up on the left, matches vbs2 on the right
-        final Iterator<VisibilityBindingSet> newLeftIt = 
join.newLeftResult(vbs1, Collections.singleton(vbs2).iterator());
-        final VisibilityBindingSet newLeftResult = newLeftIt.next();
-
-        // new vbs2 shows up on the right, matches vbs1 on the left
-        final Iterator<VisibilityBindingSet> newRightIt = 
join.newRightResult(Collections.singleton(vbs1).iterator(), vbs2);
-        final VisibilityBindingSet newRightResult = newRightIt.next();
-
-        // Ensure those two results are the same.
-        assertEquals(newLeftResult, newRightResult);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
deleted file mode 100644
index 54051ab..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/LeftOuterJoinTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.LeftOuterJoin;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Tests the methods of {@link LeftOuterJoin}.
- */
-public class LeftOuterJoinTest {
-
-    private final ValueFactory vf = new ValueFactoryImpl();
-
-    @Test
-    public void newLeftResult_noRightMatches() {
-        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
-
-        // There is a new left result.
-        final MapBindingSet mapLeftResult = new MapBindingSet();
-        mapLeftResult.addBinding("name", vf.createLiteral("Bob"));
-        final VisibilityBindingSet newLeftResult = new 
VisibilityBindingSet(mapLeftResult);
-
-        // There are no right results that join with the left result.
-        final Iterator<VisibilityBindingSet> rightResults= new 
ArrayList<VisibilityBindingSet>().iterator();
-
-        // Therefore, the left result is a new join result.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
leftOuterJoin.newLeftResult(newLeftResult, rightResults);
-
-        final Set<BindingSet> newJoinResults = new HashSet<>();
-        while(newJoinResultsIt.hasNext()) {
-            newJoinResults.add( newJoinResultsIt.next() );
-        }
-
-        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet( 
newLeftResult );
-
-        assertEquals(expected, newJoinResults);
-    }
-
-    @Test
-    public void newLeftResult_joinsWithRightResults() {
-        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
-
-        // There is a new left result.
-        final MapBindingSet mapLeftResult = new MapBindingSet();
-        mapLeftResult.addBinding("name", vf.createLiteral("Bob"));
-        mapLeftResult.addBinding("height", vf.createLiteral("5'9\""));
-        final VisibilityBindingSet newLeftResult = new 
VisibilityBindingSet(mapLeftResult);
-
-        // There are a few right results that join with the left result.
-        final MapBindingSet nameAge = new MapBindingSet();
-        nameAge.addBinding("name", vf.createLiteral("Bob"));
-        nameAge.addBinding("age", vf.createLiteral(56));
-        final VisibilityBindingSet visiAge = new VisibilityBindingSet(nameAge);
-
-        final MapBindingSet nameHair = new MapBindingSet();
-        nameHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        final VisibilityBindingSet visiHair = new 
VisibilityBindingSet(nameHair);
-
-        final Iterator<VisibilityBindingSet> rightResults = 
Lists.<VisibilityBindingSet>newArrayList(visiAge, visiHair).iterator();
-
-        // Therefore, there are a few new join results that mix the two 
together.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
leftOuterJoin.newLeftResult(newLeftResult, rightResults);
-
-        final Set<BindingSet> newJoinResults = new HashSet<>();
-        while(newJoinResultsIt.hasNext()) {
-            newJoinResults.add( newJoinResultsIt.next() );
-        }
-
-        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        final MapBindingSet nameHeightAge = new MapBindingSet();
-        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(new VisibilityBindingSet(nameHeightAge));
-
-        final MapBindingSet nameHeightHair = new MapBindingSet();
-        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(new VisibilityBindingSet(nameHeightHair));
-
-        assertEquals(expected, newJoinResults);
-    }
-
-    @Test
-    public void newRightResult_noLeftMatches() {
-        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
-
-        // There are no left results.
-        final Iterator<VisibilityBindingSet> leftResults= new 
ArrayList<VisibilityBindingSet>().iterator();
-
-        // There is a new right result.
-        final MapBindingSet newRightResult = new MapBindingSet();
-        newRightResult.addBinding("name", vf.createLiteral("Bob"));
-
-        // Therefore, there are no new join results.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
leftOuterJoin.newRightResult(leftResults, new 
VisibilityBindingSet(newRightResult));
-        assertFalse( newJoinResultsIt.hasNext() );
-    }
-
-    @Test
-    public void newRightResult_joinsWithLeftResults() {
-        final IterativeJoin leftOuterJoin = new LeftOuterJoin();
-
-        // There are a few left results that join with the new right result.
-        final MapBindingSet nameAge = new MapBindingSet();
-        nameAge.addBinding("name", vf.createLiteral("Bob"));
-        nameAge.addBinding("age", vf.createLiteral(56));
-
-        final MapBindingSet nameHair = new MapBindingSet();
-        nameHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
-
-        final Iterator<VisibilityBindingSet> leftResults = 
Lists.<VisibilityBindingSet>newArrayList(
-                new VisibilityBindingSet(nameAge),
-                new VisibilityBindingSet(nameHair)).iterator();
-
-        // There is a new right result.
-        final MapBindingSet newRightResult = new MapBindingSet();
-        newRightResult.addBinding("name", vf.createLiteral("Bob"));
-        newRightResult.addBinding("height", vf.createLiteral("5'9\""));
-
-        // Therefore, there are a few new join results that mix the two 
together.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
leftOuterJoin.newRightResult(leftResults, new 
VisibilityBindingSet(newRightResult));
-
-        final Set<BindingSet> newJoinResults = new HashSet<>();
-        while(newJoinResultsIt.hasNext()) {
-            newJoinResults.add( newJoinResultsIt.next() );
-        }
-
-        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        final MapBindingSet nameHeightAge = new MapBindingSet();
-        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(new VisibilityBindingSet(nameHeightAge));
-
-        final MapBindingSet nameHeightHair = new MapBindingSet();
-        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(new VisibilityBindingSet(nameHeightHair));
-
-        assertEquals(expected, newJoinResults);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
deleted file mode 100644
index c41f637..0000000
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/NaturalJoinTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.IterativeJoin;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.NaturalJoin;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Tests the methods of {@link NaturalJoin}.
- */
-public class NaturalJoinTest {
-
-    private final ValueFactory vf = new ValueFactoryImpl();
-
-    @Test
-    public void newLeftResult_noRightMatches() {
-        final IterativeJoin naturalJoin = new NaturalJoin();
-
-        // There is a new left result.
-        final MapBindingSet newLeftResult = new MapBindingSet();
-        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
-
-        // There are no right results that join with the left result.
-        final Iterator<VisibilityBindingSet> rightResults= new 
ArrayList<VisibilityBindingSet>().iterator();
-
-        // Therefore, the left result is a new join result.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), 
rightResults);
-        assertFalse( newJoinResultsIt.hasNext() );
-    }
-
-    @Test
-    public void newLeftResult_joinsWithRightResults() {
-        final IterativeJoin naturalJoin = new NaturalJoin();
-
-        // There is a new left result.
-        final MapBindingSet newLeftResult = new MapBindingSet();
-        newLeftResult.addBinding("name", vf.createLiteral("Bob"));
-        newLeftResult.addBinding("height", vf.createLiteral("5'9\""));
-
-        // There are a few right results that join with the left result.
-        final MapBindingSet nameAge = new MapBindingSet();
-        nameAge.addBinding("name", vf.createLiteral("Bob"));
-        nameAge.addBinding("age", vf.createLiteral(56));
-
-        final MapBindingSet nameHair = new MapBindingSet();
-        nameHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
-
-        final Iterator<VisibilityBindingSet> rightResults = 
Lists.<VisibilityBindingSet>newArrayList(
-                new VisibilityBindingSet(nameAge),
-                new VisibilityBindingSet(nameHair)).iterator();
-
-        // Therefore, there are a few new join results that mix the two 
together.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
naturalJoin.newLeftResult(new VisibilityBindingSet(newLeftResult), 
rightResults);
-
-        final Set<BindingSet> newJoinResults = new HashSet<>();
-        while(newJoinResultsIt.hasNext()) {
-            newJoinResults.add( newJoinResultsIt.next() );
-        }
-
-        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        final MapBindingSet nameHeightAge = new MapBindingSet();
-        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(new VisibilityBindingSet(nameHeightAge));
-
-        final MapBindingSet nameHeightHair = new MapBindingSet();
-        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(new VisibilityBindingSet(nameHeightHair));
-
-        assertEquals(expected, newJoinResults);
-    }
-
-    @Test
-    public void newRightResult_noLeftMatches() {
-        final IterativeJoin naturalJoin = new NaturalJoin();
-
-        // There are no left results.
-        final Iterator<VisibilityBindingSet> leftResults= new 
ArrayList<VisibilityBindingSet>().iterator();
-
-        // There is a new right result.
-        final MapBindingSet newRightResult = new MapBindingSet();
-        newRightResult.addBinding("name", vf.createLiteral("Bob"));
-
-        // Therefore, there are no new join results.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
naturalJoin.newRightResult(leftResults, new 
VisibilityBindingSet(newRightResult));
-        assertFalse( newJoinResultsIt.hasNext() );
-    }
-
-    @Test
-    public void newRightResult_joinsWithLeftResults() {
-        final IterativeJoin naturalJoin = new NaturalJoin();
-
-        // There are a few left results that join with the new right result.
-        final MapBindingSet nameAge = new MapBindingSet();
-        nameAge.addBinding("name", vf.createLiteral("Bob"));
-        nameAge.addBinding("age", vf.createLiteral(56));
-
-        final MapBindingSet nameHair = new MapBindingSet();
-        nameHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHair.addBinding("hairColor", vf.createLiteral("Brown"));
-
-        final Iterator<VisibilityBindingSet> leftResults = 
Lists.<VisibilityBindingSet>newArrayList(
-                new VisibilityBindingSet(nameAge),
-                new VisibilityBindingSet(nameHair)).iterator();
-
-        // There is a new right result.
-        final MapBindingSet newRightResult = new MapBindingSet();
-        newRightResult.addBinding("name", vf.createLiteral("Bob"));
-        newRightResult.addBinding("height", vf.createLiteral("5'9\""));
-
-        // Therefore, there are a few new join results that mix the two 
together.
-        final Iterator<VisibilityBindingSet> newJoinResultsIt = 
naturalJoin.newRightResult(leftResults, new 
VisibilityBindingSet(newRightResult));
-
-        final Set<BindingSet> newJoinResults = new HashSet<>();
-        while(newJoinResultsIt.hasNext()) {
-            newJoinResults.add( newJoinResultsIt.next() );
-        }
-
-        final Set<BindingSet> expected = Sets.<BindingSet>newHashSet();
-        final MapBindingSet nameHeightAge = new MapBindingSet();
-        nameHeightAge.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightAge.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightAge.addBinding("age", vf.createLiteral(56));
-        expected.add(new VisibilityBindingSet(nameHeightAge));
-
-        final MapBindingSet nameHeightHair = new MapBindingSet();
-        nameHeightHair.addBinding("name", vf.createLiteral("Bob"));
-        nameHeightHair.addBinding("height", vf.createLiteral("5'9\""));
-        nameHeightHair.addBinding("hairColor", vf.createLiteral("Brown"));
-        expected.add(new VisibilityBindingSet(nameHeightHair));
-
-        assertEquals(expected, newJoinResults);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
index cc120a1..2c37462 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
@@ -24,8 +24,8 @@ import java.util.Optional;
 
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
+import org.apache.rya.api.function.join.LazyJoiningIterator.Side;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
 import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
index 1007f68..5c8a1be 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchIT.java
@@ -39,11 +39,11 @@ import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.function.join.LazyJoiningIterator.Side;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
-import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0ad2c511/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e79ef75..6b2ad61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -217,6 +217,11 @@ under the License.
             </dependency>
             <dependency>
                 <groupId>org.apache.rya</groupId>
+                <artifactId>rya.api.function</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rya</groupId>
                 <artifactId>rya.export.api</artifactId>
                 <version>${project.version}</version>
             </dependency>

Reply via email to