This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 706b8a0227 Adjust Operators to be Pausable (#13694)
706b8a0227 is described below

commit 706b8a02270e68090ebcdd67a170df0007454bd2
Author: imply-cheddar <[email protected]>
AuthorDate: Tue Jan 24 13:52:06 2023 +0900

    Adjust Operators to be Pausable (#13694)
    
    * Adjust Operators to be Pausable
    
    This enables "merge" style operations that
    combine multiple streams.
    
    This change includes a naive implementation
    of one such merge operator just to provide
    concrete evidence that the refactoring is
    effective.
---
 .../druid/collections/fastutil/DruidIntList.java   |  99 ++++
 .../collections/fastutil/DruidIntListTest.java     |  97 ++++
 .../query/operator/NaivePartitioningOperator.java  |  12 +-
 .../druid/query/operator/NaiveSortOperator.java    |  10 +-
 .../org/apache/druid/query/operator/Operator.java  | 104 +++-
 .../druid/query/operator/OperatorFactory.java      |   1 +
 .../druid/query/operator/OperatorSequence.java     |  87 +++-
 .../operator/SegmentToRowsAndColumnsOperator.java  |   6 +-
 .../druid/query/operator/SequenceOperator.java     | 107 +++-
 .../query/operator/WindowProcessorOperator.java    |  33 +-
 .../druid/query/operator/join/JoinConfig.java}     |  25 +-
 .../druid/query/operator/join/JoinPartDefn.java    |  93 ++++
 .../operator/join/SortedInnerJoinOperator.java     | 558 +++++++++++++++++++++
 .../{ => window}/WindowOperatorFactory.java        |   6 +-
 .../rowsandcols/MapOfColumnsRowsAndColumns.java    |  46 ++
 .../rowsandcols/RearrangedRowsAndColumns.java      |  38 +-
 .../column/BinarySearchableAccessor.java           |  54 ++
 .../query/rowsandcols/column/ColumnAccessor.java   |  50 +-
 .../rowsandcols/column/ConstantObjectColumn.java   | 158 +++---
 .../rowsandcols/column/DoubleArrayColumn.java      | 178 ++++---
 .../query/rowsandcols/column/IntArrayColumn.java   | 184 ++++---
 .../druid/query/rowsandcols/column/NullColumn.java |  39 +-
 .../rowsandcols/column/ObjectArrayColumn.java      | 116 ++++-
 .../semantic/DefaultSortedMatrixMaker.java         | 204 ++++++++
 .../rowsandcols/semantic/SortedMatrixMaker.java    |  83 +++
 .../druid/query/rowsandcols/util/FindResult.java   |  69 +++
 .../druid/segment/CloseableShapeshifter.java       |   3 +-
 .../druid/query/operator/ExceptionalReceiver.java  |   2 +-
 .../druid/query/operator/InlineScanOperator.java   |  53 +-
 .../druid/query/operator/OperatorSequenceTest.java | 127 ++++-
 .../druid/query/operator/OperatorTestHelper.java   |  14 +-
 .../SegmentToRowsAndColumnsOperatorTest.java       |  13 +-
 .../druid/query/operator/SequenceOperatorTest.java |  11 +-
 .../operator/SortedInnerJoinOperatorTest.java      | 144 ++++++
 .../query/operator/WindowOperatorQueryTest.java    |   9 +-
 .../operator/WindowProcessorOperatorTest.java      |   2 +-
 .../apache/druid/sql/calcite/rel/Windowing.java    |   5 +-
 37 files changed, 2478 insertions(+), 362 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/collections/fastutil/DruidIntList.java 
b/core/src/main/java/org/apache/druid/collections/fastutil/DruidIntList.java
new file mode 100644
index 0000000000..d732c97235
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/collections/fastutil/DruidIntList.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections.fastutil;
+
+import it.unimi.dsi.fastutil.Arrays;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrays;
+
+public class DruidIntList extends IntArrayList
+{
+  public DruidIntList(int capacity)
+  {
+    super(capacity);
+  }
+
+  public void addArray(int[] vals)
+  {
+    grow(size + vals.length);
+    System.arraycopy(vals, 0, a, size, vals.length);
+    size += vals.length;
+  }
+
+  public void fill(int val, int count)
+  {
+    grow(size + count);
+    java.util.Arrays.fill(a, size, size + count, val);
+    size += count;
+  }
+
+  public void fillWithRepeat(int[] vals, int repeat)
+  {
+    int count = vals.length * repeat;
+    grow(size + count);
+    for (int i = 0; i < count; i += vals.length) {
+      System.arraycopy(vals, 0, a, size + i, vals.length);
+    }
+    size += count;
+  }
+
+  public void fillRuns(int[] vals, int runLength, int repeat)
+  {
+    if (runLength == 1) {
+      fillWithRepeat(vals, repeat);
+      return;
+    }
+    int count = vals.length * runLength * repeat;
+    grow(size + count);
+    for (int i = 0; i < repeat; ++i) {
+      for (int val : vals) {
+        // there's a += hidden in there, don't be tricked!
+        java.util.Arrays.fill(a, size, size += runLength, val);
+      }
+    }
+  }
+
+  public void resetToSize(int targetSize)
+  {
+    a = new int[targetSize];
+    size = 0;
+  }
+
+  /**
+   * Method gratuitously "borrowed" from IntArrayList, would've been nice if 
that method wasn't private, but ah well.
+   *
+   * @param capacity the capacity to grow to
+   * @see IntArrayList#grow(int)
+   */
+  @SuppressWarnings("ArrayEquality")
+  private void grow(int capacity)
+  {
+    if (capacity <= a.length) {
+      return;
+    }
+    if (a != IntArrays.DEFAULT_EMPTY_ARRAY) {
+      capacity = (int) Math.max(Math.min((long) a.length + (a.length >> 1), 
Arrays.MAX_ARRAY_SIZE), capacity);
+    } else if (capacity < DEFAULT_INITIAL_CAPACITY) {
+      capacity = DEFAULT_INITIAL_CAPACITY;
+    }
+    a = IntArrays.forceCapacity(a, capacity, size);
+    assert size <= a.length;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/druid/collections/fastutil/DruidIntListTest.java
 
b/core/src/test/java/org/apache/druid/collections/fastutil/DruidIntListTest.java
new file mode 100644
index 0000000000..3dd6862867
--- /dev/null
+++ 
b/core/src/test/java/org/apache/druid/collections/fastutil/DruidIntListTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections.fastutil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DruidIntListTest
+{
+
+  @Test
+  public void addArray()
+  {
+    DruidIntList intList = new DruidIntList(2);
+
+    final int[] arrrrr = {0, 1, 2, 3};
+    intList.addArray(arrrrr);
+    expectEquals(intList, arrrrr);
+  }
+
+  @Test
+  public void fill()
+  {
+    DruidIntList intList = new DruidIntList(2);
+
+    intList.fill(2, 4);
+
+    expectEquals(intList, new int[]{2, 2, 2, 2});
+  }
+
+  @Test
+  public void fillWithRepeat()
+  {
+    DruidIntList intList = new DruidIntList(2);
+
+    intList.fillWithRepeat(new int[]{0, 1}, 2);
+
+    expectEquals(intList, new int[]{0, 1, 0, 1});
+  }
+
+  @Test
+  public void fillRuns()
+  {
+    DruidIntList intList = new DruidIntList(2);
+
+    intList.fillRuns(new int[]{0, 1}, 2, 2);
+
+    expectEquals(intList, new int[]{0, 0, 1, 1, 0, 0, 1, 1});
+  }
+
+  @Test
+  public void fillRunsRunLengthOne()
+  {
+    DruidIntList intList = new DruidIntList(2);
+
+    intList.fillRuns(new int[]{0, 1}, 1, 2);
+
+    expectEquals(intList, new int[]{0, 1, 0, 1});
+  }
+
+  @Test
+  public void resetToSize()
+  {
+    DruidIntList intList = new DruidIntList(2);
+
+    intList.fill(2, 4);
+
+    Assert.assertEquals(4, intList.size());
+    intList.resetToSize(4);
+    Assert.assertEquals(0, intList.size());
+  }
+
+  private void expectEquals(DruidIntList intList, int[] expected)
+  {
+    Assert.assertEquals(expected.length, intList.size());
+    for (int i = 0; i < expected.length; ++i) {
+      Assert.assertEquals(String.valueOf(i), expected[i], intList.getInt(i));
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
index 0e9e951418..90000a9fa6 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
@@ -23,6 +23,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
 import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
 
+import java.io.Closeable;
 import java.util.Iterator;
 import java.util.List;
 
@@ -52,13 +53,14 @@ public class NaivePartitioningOperator implements Operator
   }
 
   @Override
-  public void go(Receiver receiver)
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
   {
-    child.go(
+    return child.goOrContinue(
+        continuation,
         new Receiver()
         {
           @Override
-          public boolean push(RowsAndColumns rac)
+          public Signal push(RowsAndColumns rac)
           {
             ClusteredGroupPartitioner groupPartitioner = 
rac.as(ClusteredGroupPartitioner.class);
             if (groupPartitioner == null) {
@@ -67,8 +69,8 @@ public class NaivePartitioningOperator implements Operator
 
             partitionsIter = 
groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
 
-            boolean keepItGoing = true;
-            while (keepItGoing && partitionsIter.hasNext()) {
+            Signal keepItGoing = Signal.GO;
+            while (keepItGoing == Signal.GO && partitionsIter.hasNext()) {
               keepItGoing = receiver.push(partitionsIter.next());
             }
             return keepItGoing;
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
index 615d18df86..e11da38421 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.operator;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 
 /**
@@ -44,22 +45,23 @@ public class NaiveSortOperator implements Operator
   }
 
   @Override
-  public void go(Receiver receiver)
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
   {
-    child.go(
+    return child.goOrContinue(
+        continuation,
         new Receiver()
         {
           NaiveSortMaker.NaiveSorter sorter = null;
 
           @Override
-          public boolean push(RowsAndColumns rac)
+          public Signal push(RowsAndColumns rac)
           {
             if (sorter == null) {
               sorter = NaiveSortMaker.fromRAC(rac).make(sortColumns);
             } else {
               sorter.moreData(rac);
             }
-            return true;
+            return Signal.GO;
           }
 
           @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/Operator.java 
b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
index ecc0765e74..a9a18c36d5 100644
--- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java
+++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
@@ -21,6 +21,9 @@ package org.apache.druid.query.operator;
 
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
 /**
  * An Operator interface that intends to have implementations that align 
relatively closely with the Operators that
  * other databases would also tend to be implemented using.  While a lot of 
Operator interfaces tend to use a
@@ -43,11 +46,100 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
 public interface Operator
 {
   /**
-   * Tells the Operation to start doing its work.  Data will be pushed into 
the Receiver.
+   * Convenience method to run an Operator until completion.  Data will be 
pushed into the Receiver.  This is the
+   * primary entry point that users of Operators will call to do their work.
    *
+   * @param op       the operator to run to completion
    * @param receiver a receiver that will receive data
    */
-  void go(Receiver receiver);
+  static void go(Operator op, Receiver receiver)
+  {
+    Closeable continuation = null;
+    do {
+      continuation = op.goOrContinue(continuation, receiver);
+    } while (continuation != null);
+  }
+
+  /**
+   * This is the primary workhorse method of an Operator.  That said, users of 
Operators are not expected to use this
+   * method and instead are expected to call the static method {@link 
Operator#go(Operator, Receiver)}.
+   * <p>
+   * Data will be pushed into the Receiver.  The Receiver has the option of 
returning any of the {@link Signal} signals
+   * to indicate its degree of readiness for more data to be received.
+   * <p>
+   * If a Receiver returns a {@link Signal#PAUSE} signal, then if there is 
processing left to do, then it is expected
+   * that a non-null "continuation" object nwill be returned.  This allows for 
flow control to be returned to the
+   * caller to, e.g., process another Operator or just exert backpressure.  In 
this case, when the controller wants to
+   * resume, it must call this method again and include the continuation 
object that it received.
+   * <p>
+   * The continuation object is Closeable because it is possible that while 
processing is paused on one Operator, the
+   * processing of another Operator could obviate the need for further 
processing.  In this case, instead of resuming
+   * the paused Operation and returning {@link Signal#STOP} on the next push 
into the Receiver, the code must
+   * call {@link Closeable#close()} on the continuation object to cancel all 
further processing and clean up all
+   * related resources.  If, instead, the continuation object is passed back 
into a call to goOrContinue, then
+   * close() must <em>NOT</em> be called on the continuation object.  Said 
again, the controller must either
+   * 1) pass the continuation object back into a call to goOrContinue, OR
+   * 2) call close() on the continuation object
+   * and <em>NEVER</em> do both.
+   * <p>
+   * Once a reference to a continuation object has been passed back to a 
goOrContinue method, it should never be
+   * reused by the controller.  This is to give Operator implementations the 
ability to decide whether it makes sense
+   * to reuse the objects on subsequent calls or create new ones.
+   * <p>
+   * A null return value from this method indicates that processing is 
complete.  The Receiver should have had its
+   * {@link Receiver#completed()} method called and any resources associated 
with processing have already been cleaned
+   * up.  Additionally, if an exception escapes a call to this method, any 
resources associated with processing should have
+   * been cleaned up.
+   * <p>
+   * For implementators of the interface, if an Operator does not have any 
resources of its own to clean up, then it is
+   * safe to just pass through the continuation object to the caller.  
However, if there are resources associated with
+   * the processing that must be cleaned up, the Operator implementation must 
wrap the received Closeable in a new
+   * Closeable that will close those resources.  In this case, when the object 
comes back to the Operator on a call to
+   * goOrContinue, the Operator must unwrap the internal Closeable and pass 
that back down.  In a similar fashion,
+   * if there is any state that an Operator requires to be able to resume its 
processing, then it is expected that the
+   * Operator will cast the object back to an instance of the type that it had 
originally returned.
+   *
+   * @param receiver a receiver that will receiver data
+   * @return null if processing is complete, non-null if the Receiver returned 
a {@link Signal#PAUSE} signal
+   */
+  @Nullable
+  Closeable goOrContinue(Closeable continuationObject, Receiver receiver);
+
+  /**
+   * This is the return object from a receiver.  It is used to communicate to 
whatever is pushing the data into the
+   * receiver the state of processing.  This exists because Operators can 
sometimes decide that no more results will
+   * be needed (e.g. if the result set is being limited), in which case, they 
need some way to communicate this
+   * to downstream processing to effectively "cancel" further computation.
+   * <p>
+   * It's named weird because... well, the author had a hard time coming up 
with a meaningful name.  Suggestions
+   * for alternate names are welcome.
+   */
+  enum Signal
+  {
+    /**
+     * Indicates that the downstream processing need not do anything else.  
Operators that return this should avoid
+     * pre-emptively calling {@link Receiver#completed()} before returning 
STOP.  They should instead return STOP
+     * and trust that the downstream code will call {@link 
Receiver#completed()}.  This is because downstream code
+     * *might* be pipelining computations to prepare the next set of data and 
if the Operator first calls
+     * {@link Receiver#completed()} before communicating that no further 
results are needed, it delays the canceling
+     * of the pipelined operations and effectively wastes CPU cycles.
+     */
+    STOP,
+    /**
+     * Inidcates that the downstream processing should pause its pushing of 
results and instead return a
+     * continuation object that encapsulates whatever state is required to 
resume processing.  When this signal is
+     * received, Operators that are generating data might choose to exert 
backpressure or otherwise pause their
+     * processing efforts until called again with the returned continuation 
object.
+     * <p>
+     * If an Operator has completed its processing already when this signal is 
received, instead of returning a
+     * continuation object, it should call {@link Receiver#completed()} and 
return null.
+     */
+    PAUSE,
+    /**
+     * Indicates that more data is welcome.
+     */
+    GO
+  }
 
   interface Receiver
   {
@@ -59,15 +151,15 @@ public interface Operator
      * @return a boolean value indicating if more data will be accepted.  If 
false, push should never be called
      * anymore
      */
-    boolean push(RowsAndColumns rac);
+    Signal push(RowsAndColumns rac);
 
     /**
      * Used to indicate that no more data will ever come.  This is only used 
during the happy path and is not
-     * equivalent to a {@link java.io.Closeable#close()} method.  Namely, 
there is no guarantee that this method
+     * equivalent to a {@link Closeable#close()} method.  Namely, there is no 
guarantee that this method
      * will be called if execution halts due to an exception from push.
-     *
+     * <p>
      * It is acceptable for an implementation to eagerly close resources from 
this method, but it is not acceptable
-     * for this method to be the sole method of managing the lifecycle of 
resources held by the Operator
+     * for this method to be the sole method of managing the lifecycle of 
resources held by the Operator.
      */
     void completed();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java 
b/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java
index 3d7db4af48..90bd30862c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.operator;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
 
 /**
  * A factory for Operators.  This class exists to encapsulate the 
user-definition of an Operator. I.e. which operator,
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
 
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
index 06fe9db8e7..9fcec5d552 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
@@ -22,20 +22,16 @@ package org.apache.druid.query.operator;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.guava.YieldingAccumulator;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.function.Supplier;
 
 /**
- * Provides a sequence on top of Operators.  The mis-match in pull (Sequence) 
and push (Operator) means that, if we
- * choose to support the Yielder interface, we have to use threading.  
Managing extra threads in order to do that
- * is unfortunate, so, we choose to take a bit of a cop-out approach.
- *
- * Specifically, the accumulate method doesn't actually have the same problem 
and the query pipeline after the merge
- * functions is composed of Sequences that all use accumulate instead of 
yielder.  Thus, if we are certain that
- * we only use the OperatorSequence in places where toYielder is not called 
(i.e. it's only used as the return
- * value of the merge() calls), then we can get away with only implementing 
the accumulate path.
+ * Provides a sequence on top of Operators.
  */
 public class OperatorSequence implements Sequence<RowsAndColumns>
 {
@@ -54,8 +50,18 @@ public class OperatorSequence implements 
Sequence<RowsAndColumns>
       Accumulator<OutType, RowsAndColumns> accumulator
   )
   {
-    final MyReceiver<OutType> receiver = new MyReceiver<>(initValue, 
accumulator);
-    opSupplier.get().go(receiver);
+    final MyReceiver<OutType> receiver = new MyReceiver<>(
+        initValue,
+        new YieldingAccumulator<OutType, RowsAndColumns>()
+        {
+          @Override
+          public OutType accumulate(OutType accumulated, RowsAndColumns in)
+          {
+            return accumulator.accumulate(accumulated, in);
+          }
+        }
+    );
+    Operator.go(opSupplier.get(), receiver);
     return receiver.getRetVal();
   }
 
@@ -65,32 +71,79 @@ public class OperatorSequence implements 
Sequence<RowsAndColumns>
       YieldingAccumulator<OutType, RowsAndColumns> accumulator
   )
   {
-    // As mentioned in the class-level javadoc, we skip this implementation 
and leave it up to the developer to
-    // only use this class in "safe" locations.
-    throw new UnsupportedOperationException("Cannot convert an Operator to a 
Yielder");
+    final Operator op = opSupplier.get();
+    final MyReceiver<OutType> receiver = new MyReceiver<>(initValue, 
accumulator);
+    final Closeable finalContinuation = op.goOrContinue(null, receiver);
+    if (finalContinuation == null && !accumulator.yielded()) {
+      // We finished processing, and the accumulator did not yield, so we 
return a done yielder with our value
+      return Yielders.done(receiver.getRetVal(), null);
+    } else {
+      return new Yielder<OutType>()
+      {
+        private Closeable continuation = finalContinuation;
+
+        @Override
+        public OutType get()
+        {
+          return receiver.getRetVal();
+        }
+
+        @Override
+        public Yielder<OutType> next(OutType initValue)
+        {
+          if (continuation == null) {
+            // This means that we completed processing on the previous run.  
In this case, we are all done
+            return Yielders.done(null, null);
+          }
+          receiver.setRetVal(initValue);
+
+          continuation = op.goOrContinue(continuation, receiver);
+          return this;
+        }
+
+        @Override
+        public boolean isDone()
+        {
+          return false;
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+          if (continuation != null) {
+            continuation.close();
+          }
+        }
+      };
+    }
   }
 
   private static class MyReceiver<OutType> implements Operator.Receiver
   {
-    private final Accumulator<OutType, RowsAndColumns> accumulator;
+    private final YieldingAccumulator<OutType, RowsAndColumns> accumulator;
     private OutType retVal;
 
-    public MyReceiver(OutType initValue, Accumulator<OutType, RowsAndColumns> 
accumulator)
+    public MyReceiver(OutType initValue, YieldingAccumulator<OutType, 
RowsAndColumns> accumulator)
     {
       this.accumulator = accumulator;
       retVal = initValue;
     }
 
+    public void setRetVal(OutType retVal)
+    {
+      this.retVal = retVal;
+    }
+
     public OutType getRetVal()
     {
       return retVal;
     }
 
     @Override
-    public boolean push(RowsAndColumns rac)
+    public Operator.Signal push(RowsAndColumns rac)
     {
       retVal = accumulator.accumulate(retVal, rac);
-      return true;
+      return accumulator.yielded() ? Operator.Signal.PAUSE : 
Operator.Signal.GO;
     }
 
     @Override
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
index 67a847d81d..e60589497d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
@@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.segment.CloseableShapeshifter;
 import org.apache.druid.segment.Segment;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 public class SegmentToRowsAndColumnsOperator implements Operator
@@ -39,7 +40,7 @@ public class SegmentToRowsAndColumnsOperator implements 
Operator
   }
 
   @Override
-  public void go(Receiver receiver)
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
   {
     try (final CloseableShapeshifter shifty = 
segment.as(CloseableShapeshifter.class)) {
       if (shifty == null) {
@@ -50,11 +51,14 @@ public class SegmentToRowsAndColumnsOperator implements 
Operator
       if (rac == null) {
         throw new ISE("Cannot work with segment of type[%s]", 
segment.getClass());
       }
+
+      // After pushing in a single object, we are done, so ignore the signal 
and call completed()
       receiver.push(rac);
       receiver.completed();
     }
     catch (IOException e) {
       throw new RE(e, "Problem closing resources for segment[%s]", 
segment.getId());
     }
+    return null;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
index 4014a10e6c..7f42a0dbf2 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
@@ -19,11 +19,20 @@
 
 package org.apache.druid.query.operator;
 
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 public class SequenceOperator implements Operator
 {
+  private static final Logger log = new Logger(SequenceOperator.class);
+
   private final Sequence<RowsAndColumns> child;
 
   public SequenceOperator(
@@ -33,16 +42,96 @@ public class SequenceOperator implements Operator
     this.child = child;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public void go(Receiver receiver)
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
   {
-    child.accumulate(
-        null,
-        (accumulated, in) -> {
-          receiver.push(in);
-          return accumulated;
-        }
-    );
-    receiver.completed();
+    Yielder<Signal> yielder = null;
+    final Signal theSignal;
+    if (continuation == null) {
+      yielder = child.toYielder(Signal.GO, new YieldingAccumulator<Signal, 
RowsAndColumns>()
+      {
+        @Override
+        public Signal accumulate(Signal accumulated, RowsAndColumns in)
+        {
+          final Signal pushSignal = receiver.push(in);
+          switch (pushSignal) {
+            case PAUSE:
+              this.yield();
+              return Signal.PAUSE;
+            case GO:
+              return Signal.GO;
+            case STOP:
+              this.yield();
+              return Signal.STOP;
+            default:
+              throw new ISE("How can this be happening? signal[%s]", 
pushSignal);
+          }
+        }
+      });
+      theSignal = yielder.get();
+    } else {
+      try {
+        final Yielder<Signal> castedYielder = (Yielder<Signal>) continuation;
+        if (castedYielder.isDone()) {
+          throw new ISE(
+              "The yielder is done!  The previous go call should've resulted 
in completion instead of continuation"
+          );
+        }
+        yielder = castedYielder.next(Signal.GO);
+        theSignal = yielder.get();
+      }
+      catch (ClassCastException e) {
+        try {
+          if (yielder == null) {
+            // Got the exception when casting the continuation, close the 
continuation and move on.
+            continuation.close();
+          } else {
+            // Got the exception when reading the result from the 
continuation, close the yielder and move on.
+            yielder.close();
+          }
+        }
+        catch (IOException ex) {
+          e.addSuppressed(
+              new ISE("Unable to close continuation[%s] of type[%s]", 
continuation, continuation.getClass())
+          );
+        }
+        throw e;
+      }
+    }
+
+    switch (theSignal) {
+      // We get GO from the yielder if the last push created a GO and there 
was nothing left in the sequence.
+      // I.e. we are done
+      case GO:
+      case STOP:
+        try {
+          receiver.completed();
+        }
+        catch (RuntimeException e) {
+          try {
+            yielder.close();
+          }
+          catch (IOException ioException) {
+            e.addSuppressed(ioException);
+            throw e;
+          }
+        }
+
+        try {
+          yielder.close();
+        }
+        catch (IOException e) {
+          // We got an exception when closing after we received a STOP signal 
and successfully called completed().
+          // This means that the Receiver has already done what it needs, so 
instead of throw the exception and
+          // potentially impact processing, we log instead and allow 
processing to continue.
+          log.warn(e, "Exception thrown when closing yielder.  Logging and 
ignoring because results should be fine.");
+        }
+        return null;
+      case PAUSE:
+        return yielder;
+    }
+    throw new ISE("How can this happen!? signal[%s]", theSignal);
   }
+
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
index 02bf780f55..4c4caa944c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
@@ -22,6 +22,8 @@ package org.apache.druid.query.operator;
 import org.apache.druid.query.operator.window.Processor;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
+import java.io.Closeable;
+
 /**
  * An Operator that applies a {@link Processor}, see javadoc on that interface 
for an explanation.
  */
@@ -40,21 +42,24 @@ public class WindowProcessorOperator implements Operator
   }
 
   @Override
-  public void go(Receiver receiver)
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
   {
-    child.go(new Receiver()
-    {
-      @Override
-      public boolean push(RowsAndColumns rac)
-      {
-        return receiver.push(windowProcessor.process(rac));
-      }
+    return child.goOrContinue(
+        continuation,
+        new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            return receiver.push(windowProcessor.process(rac));
+          }
 
-      @Override
-      public void completed()
-      {
-        receiver.completed();
-      }
-    });
+          @Override
+          public void completed()
+          {
+            receiver.completed();
+          }
+        }
+    );
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
 b/processing/src/main/java/org/apache/druid/query/operator/join/JoinConfig.java
similarity index 69%
copy from 
processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
copy to 
processing/src/main/java/org/apache/druid/query/operator/join/JoinConfig.java
index b4c52e1950..f60e450283 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/join/JoinConfig.java
@@ -17,21 +17,26 @@
  * under the License.
  */
 
-package org.apache.druid.query.operator;
+package org.apache.druid.query.operator.join;
 
-import org.apache.druid.query.rowsandcols.RowsAndColumns;
-
-public class ExceptionalReceiver implements Operator.Receiver
+public class JoinConfig
 {
-  @Override
-  public boolean push(RowsAndColumns rac)
+  private final int releaseSize;
+
+  public JoinConfig(
+      int releaseSize
+  )
+  {
+    this.releaseSize = releaseSize;
+  }
+
+  public int getReleaseSize()
   {
-    throw new UnsupportedOperationException();
+    return releaseSize;
   }
 
-  @Override
-  public void completed()
+  public int getBufferSize()
   {
-    throw new UnsupportedOperationException();
+    return Math.min(releaseSize, releaseSize + (releaseSize >>> 4)); // + 
1/16th
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/join/JoinPartDefn.java
 
b/processing/src/main/java/org/apache/druid/query/operator/join/JoinPartDefn.java
new file mode 100644
index 0000000000..9594175c71
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/join/JoinPartDefn.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.join;
+
+import org.apache.druid.query.operator.Operator;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class JoinPartDefn
+{
+  public static Builder builder(Operator op)
+  {
+    return new Builder(op);
+  }
+
+
+  private final Operator op;
+  private final List<String> joinFields;
+  private final List<String> projectFields;
+
+  public JoinPartDefn(
+      Operator op,
+      List<String> joinFields,
+      List<String> projectFields
+  )
+  {
+    this.op = op;
+    this.joinFields = joinFields;
+    this.projectFields = projectFields;
+  }
+
+  public Operator getOp()
+  {
+    return op;
+  }
+
+  public List<String> getJoinFields()
+  {
+    return joinFields;
+  }
+
+  public List<String> getProjectFields()
+  {
+    return projectFields;
+  }
+
+  public static class Builder
+  {
+    private final Operator op;
+    private List<String> joinFields;
+    private List<String> projectFields;
+
+    public Builder(Operator op)
+    {
+      this.op = op;
+    }
+
+    public Builder joinOn(String... fields)
+    {
+      joinFields = Arrays.asList(fields);
+      return this;
+    }
+
+    public Builder project(String... fields)
+    {
+      projectFields = Arrays.asList(fields);
+      return this;
+    }
+
+    public JoinPartDefn build()
+    {
+      return new JoinPartDefn(op, joinFields, projectFields);
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
new file mode 100644
index 0000000000..2468d79f78
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.join;
+
+import org.apache.druid.collections.fastutil.DruidIntList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RearrangedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.semantic.SortedMatrixMaker;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An operator that can join the data streams from other operators.  Data must 
be provided in the same sorted
+ * order between the different operators.
+ * <p>
+ * Performs an InnerJoin based on the equality of two sets of fields.  Null is 
considered a meaningful value
+ * when comparing the two streams.  If null is intended to be excluded, it 
should be removed through a filter.
+ * <p>
+ * This class was created more as an exercise in ensuring that something 
meaningful can be made to do combinations
+ * of Operators (and ensure the interface is correct).  There are tests that 
show that this class works, but those
+ * tests are not (yet) considered exhaustive.  This paragraph in the comments 
should exist as a cautionary indication
+ * that if/when this class is dusted off for use again, there might be bugs 
yet lurking and it should likely start
+ * with fleshing out the tests.
+ */
+public class SortedInnerJoinOperator implements Operator
+{
+  private static final Logger log = new Logger(SortedInnerJoinOperator.class);
+
+  private final ArrayList<JoinPart> parts;
+  private final JoinConfig config;
+
+  public SortedInnerJoinOperator(
+      List<JoinPartDefn> partDefns,
+      JoinConfig config
+  )
+  {
+    this.parts = new ArrayList<>(partDefns.size());
+    for (JoinPartDefn partDefn : partDefns) {
+      parts.add(new JoinPart(partDefn.getOp(), partDefn.getJoinFields(), 
partDefn.getProjectFields()));
+    }
+
+    this.config = config;
+  }
+
+  @Override
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+  {
+    JoinLogic joinLogic;
+    if (continuation == null) {
+      joinLogic = new JoinLogic(config, parts);
+    } else {
+      joinLogic = (JoinLogic) continuation;
+    }
+
+    try {
+      joinLogic.go(receiver);
+      switch (joinLogic.state) {
+        case NEEDS_DATA:
+        case READY:
+          throw new ISE("joinLogic.go() exited with state[%s], should never 
happen.", joinLogic.state);
+        case COMPLETE:
+          return null;
+        case PAUSED:
+          return joinLogic;
+        default:
+          throw new ISE("Unknown state[%s]", joinLogic.state);
+      }
+    }
+    catch (RuntimeException e) {
+      try {
+        joinLogic.close();
+      }
+      catch (Exception ex) {
+        e.addSuppressed(ex);
+      }
+      throw e;
+    }
+  }
+
+  private static class JoinLogic implements Closeable
+  {
+    private final JoinConfig config;
+
+    private final ArrayList<JoinPart> joinParts;
+
+    private State state;
+    private int nextPositionToLoad;
+
+    private JoinLogic(
+        JoinConfig config,
+        ArrayList<JoinPart> joinParts
+    )
+    {
+      this.config = config;
+      this.joinParts = joinParts;
+
+      setNextPositionToLoad(joinParts.size() - 1);
+    }
+
+    public void go(Receiver receiver)
+    {
+      while (state != State.COMPLETE) {
+        final int position = nextPositionToLoad;
+        final JoinPart joinPart = joinParts.get(position);
+        //noinspection VariableNotUsedInsideIf
+        if (joinPart.curr != null) {
+          throw new ISE("loading data for position[%d], but it already had 
data!?  Probably a bug!", position);
+        }
+
+        joinPart.goOrContinue(new Receiver()
+        {
+          @Override
+          public Signal push(RowsAndColumns rac)
+          {
+            joinPart.setCurr(rac);
+            process(receiver);
+
+            switch (state) {
+              case COMPLETE:
+                return Signal.STOP;
+              case NEEDS_DATA:
+                return Signal.GO;
+              case PAUSED:
+                return Signal.PAUSE;
+              case READY:
+                throw new ISE("Was in state READY after process returned!?");
+              default:
+                throw new ISE("Unknown state[%s]", state);
+            }
+          }
+
+          @Override
+          public void completed()
+          {
+            joinPart.complete.set(true);
+          }
+        });
+
+        if (joinPart.continuation == null && joinPart.needsData() && 
joinPart.isComplete()) {
+          // In this case, (1) the op returned null, (2) we don't have 
anything buffered to process and (3) completed
+          // was called.  We are done.
+          state = State.COMPLETE;
+        }
+
+        switch (state) {
+          case READY:
+            throw new ISE("Don't expect READY state here, process() should've 
changed it to something else");
+          case PAUSED:
+            return;
+          case COMPLETE:
+            receiver.completed();
+
+            try {
+              close();
+            }
+            catch (IOException e) {
+              log.warn("Problem closing a join part[%d], ignoring because we 
are done anyway.", position);
+            }
+            break;
+          case NEEDS_DATA:
+            break;
+          default:
+            throw new ISE("Unknown state[%s]", state);
+        }
+      }
+    }
+
+    /**
+     * Processes whatever it can from the buffers, pushes the created 
RowsAndColumns to the receiver and
+     * returns which side we need more data from.
+     * <p>
+     * updates {@link #nextPositionToLoad} to a positive number if there is 
another position to load.
+     * sets it to -1 if processing is complete.
+     */
+    private void process(Receiver receiver)
+    {
+      // First check that we have something to work with for all parts of the 
join
+      for (int i = joinParts.size() - 1; i >= 0; --i) {
+        final JoinPart joinPart = joinParts.get(i);
+        if (joinPart.needsData()) {
+          if (joinPart.isComplete()) {
+            state = State.COMPLETE;
+          } else {
+            setNextPositionToLoad(i);
+          }
+          return;
+        }
+      }
+      state = State.READY;
+
+      DruidIntList[] rowsToInclude = new DruidIntList[joinParts.size()];
+      for (int i = 0; i < rowsToInclude.length; ++i) {
+        rowsToInclude[i] = new DruidIntList(config.getBufferSize());
+        if (joinParts.get(i).needsData()) {
+          throw new ISE("doJoin called while joinPart[%d] needed data.  This 
is likely a bug", i);
+        }
+      }
+
+      final int finalIndex = joinParts.size() - 1;
+      final JoinPart finalPart = joinParts.get(finalIndex);
+      SortedMatrixMaker.SortedMatrix.MatrixRow row = null;
+      while (state == State.READY) {
+        if (row == null) {
+          row = finalPart.currMatrix.getRow(finalPart.currRowIndex);
+        }
+
+        row = joinRows(receiver, finalIndex, rowsToInclude, row);
+      }
+
+      pushRows(receiver, rowsToInclude);
+    }
+
+    private void pushRows(Receiver receiver, DruidIntList[] rowsToInclude)
+    {
+      int size = rowsToInclude[0].size();
+      if (size == 0) {
+        return;
+      }
+
+      LinkedHashMap<String, Column> cols = new LinkedHashMap<>();
+
+      for (int i = 0; i < joinParts.size(); ++i) {
+        final JoinPart part = joinParts.get(i);
+        RowsAndColumns remapped =
+            new RearrangedRowsAndColumns(rowsToInclude[i].elements(), 0, 
rowsToInclude[i].size(), part.curr);
+
+        for (String field : part.projectFields) {
+          cols.put(field, remapped.findColumn(field));
+        }
+      }
+
+      final Signal signal = receiver.push(new MapOfColumnsRowsAndColumns(cols, 
size));
+      switch (signal) {
+        case STOP:
+          state = State.COMPLETE;
+          break;
+        case PAUSE:
+          state = State.PAUSED;
+          break;
+        case GO:
+          break;
+        default:
+          throw new ISE("Unknown state[%s]", signal);
+      }
+    }
+
+    /**
+     * Joins rows by recursively walking the joinParts backwards.
+     * <p>
+     * Populates rowsToInclude such it represents the row mapping between the 
different parts of the join.  That is
+     * if there are 2 sides to the inner join and there are only 2 matches:
+     * 1) row 50 on part 0, and row 8 on part 1
+     * 2) row 8372 on part 0, and row 9 on part 1
+     * <p>
+     * Then rowsToInclude would be the equivalent of {@code new int[][]{{50, 
8372}, {8, 9}}}
+     * <p>
+     * If a specific part cannot match the currently provided row, then it 
will seek to the next possible row that it
+     * *could* match and return that as an alternative option.  The previous 
callers can then try to find that
+     * alternative row until either one of the parts is exhausted of data OR a 
match is found.
+     * <p>
+     * This method returns null when there is no more work that can be done on 
the current row.  This could be because
+     * a match was found and results generated, or it could be because one of 
the parts was exhausted and needs more.
+     *
+     * @param joinPartIndex the index of the join part that the current call 
should be looking at
+     * @param rowsToInclude a mapping of rows that should be included in 
results
+     * @param row           the current candidate join value to match
+     * @return null if no more work can be done, non-null if there is an 
alternative row that a part wants to search for
+     */
+    @Nullable
+    private SortedMatrixMaker.SortedMatrix.MatrixRow joinRows(
+        Receiver receiver,
+        int joinPartIndex,
+        DruidIntList[] rowsToInclude,
+        SortedMatrixMaker.SortedMatrix.MatrixRow row
+    )
+    {
+      final JoinPart joinPart = joinParts.get(joinPartIndex);
+
+      final FindResult findResult = joinPart.currMatrix.findRow(
+          joinPart.currRowIndex,
+          row
+      );
+
+      if (findResult.wasFound()) {
+        joinPart.currRowIndex = findResult.getStartRow();
+        joinPart.scanToRowIndex = findResult.getEndRow();
+
+        if (joinPartIndex == 0) {
+          // We have walked through all of the joinParts and have matches, so 
time to add to rowsToInclude
+
+          // We have ranges in each of the parts, we will do the cartesian 
product, so we will produce the product
+          // of the length of those ranges number of rows.  Let's compute it 
to see how many rows we will produce.
+          int numRowsExpected = joinPart.scanToRowIndex - 
joinPart.currRowIndex;
+          for (int i = 1; i < joinParts.size(); ++i) {
+            final JoinPart subPart = joinParts.get(i);
+            numRowsExpected *= subPart.scanToRowIndex - subPart.currRowIndex;
+          }
+
+          if (numRowsExpected == 1) {
+            for (int i = 0; i < joinParts.size(); ++i) {
+              rowsToInclude[i].add(joinParts.get(i).currRowIndex);
+            }
+          } else {
+            if (numRowsExpected > 1_000_000) {
+              // It would be helpful to serialize the actual value out with 
this error, but that risks leaking data
+              throw new IAE("Got a join, with a cartesian product that exceeds 
1,000,000 rows, cannot handle it");
+            }
+
+            // The rowIds that will be used in the result of the join will be 
a cartesian product, which means that
+            // the "deepest" row ids will be repeated in-order over and over, 
then the next layer will have each
+            // value repeated runSize times, forming a new run of length * 
runSize, and so on and so forth
+            int partIndex = joinParts.size() - 1;
+            int runSize = 1;
+            // We are guaranteed that there is a runSize greater than 1 
because otherwise numRowsExpected would be 1
+            while (partIndex >= 0) {
+              final JoinPart part = joinParts.get(partIndex);
+              final int size = part.scanToRowIndex - part.currRowIndex;
+              if (size == 1) {
+                rowsToInclude[partIndex].fill(part.currRowIndex, 
numRowsExpected);
+              } else {
+                int[] vals = new int[size];
+                for (int i = 0; i < vals.length; ++i) {
+                  vals[i] = i + part.currRowIndex;
+                }
+                final int newRunSize = size * runSize;
+                rowsToInclude[partIndex].fillRuns(vals, runSize, 
numRowsExpected / newRunSize);
+                runSize = newRunSize;
+              }
+
+              --partIndex;
+            }
+          }
+
+          if (rowsToInclude[0].size() > config.getReleaseSize()) {
+            // Incrementally push stuff out once we've collected this number 
of rows
+            pushRows(receiver, rowsToInclude);
+            if (state == State.READY) {
+              // We have more to do, so reinitialize rowsToInclude
+              for (DruidIntList intList : rowsToInclude) {
+                intList.resetToSize(config.getBufferSize());
+              }
+            }
+          }
+
+          return null;
+        } else {
+          // Continue the march through the joinParts
+          final SortedMatrixMaker.SortedMatrix.MatrixRow alternativeRow =
+              joinRows(receiver, joinPartIndex - 1, rowsToInclude, row);
+
+          if (consumeCurrMaybePush(receiver, joinPartIndex, rowsToInclude, 
joinPart)) {
+            return null;
+          }
+
+          if (alternativeRow == null) {
+            return null;
+          } else {
+            // The next part didn't have the row we wanted, so let's check for 
the candidate that it returned
+            final FindResult alternativeFind =
+                joinPart.currMatrix.findRow(joinPart.currRowIndex, 
alternativeRow);
+
+            if (alternativeFind == null) {
+              joinPart.reinitCurr();
+              setNextPositionToLoad(joinPartIndex);
+              return null;
+            } else if (alternativeFind.wasFound()) {
+              // We update our values to make finding it again easier
+              joinPart.currRowIndex = alternativeFind.getStartRow();
+              joinPart.scanToRowIndex = alternativeFind.getEndRow();
+              return alternativeRow;
+            } else {
+              if (consumeCurrMaybePush(receiver, joinPartIndex, rowsToInclude, 
joinPart)) {
+                return null;
+              } else {
+                return joinPart.currMatrix.getRow(joinPart.currRowIndex);
+              }
+            }
+          }
+        }
+      } else {
+        final int next = findResult.getNext();
+        if (next >= joinPart.currMatrix.numRows()) {
+          // the next item is beyond the current RAC, so we need to load more 
data.
+          joinPart.reinitCurr();
+          setNextPositionToLoad(joinPartIndex);
+          return null;
+        }
+        // No match, so return the next possible match such that the caller 
can seek to that.
+        return joinPart.currMatrix.getRow(next);
+      }
+    }
+
+    private boolean consumeCurrMaybePush(
+        Receiver receiver,
+        int joinPartIndex,
+        DruidIntList[] rowsToInclude,
+        JoinPart joinPart
+    )
+    {
+      // Consume the current range of data by jumping to the end
+      joinPart.jumpTo(joinPart.scanToRowIndex);
+      if (joinPart.needsData()) {
+        // If we need more data, we must first push out anything we've built 
up so far
+        // before we drop the reference to the RAC
+        pushRows(receiver, rowsToInclude);
+        for (DruidIntList intList : rowsToInclude) {
+          intList.clear();
+        }
+        // We ran out of data, so we need to just mark that we need data and 
return
+        if (joinPart.isComplete()) {
+          state = State.COMPLETE;
+          joinPart.reinitCurr();
+        } else {
+          setNextPositionToLoad(joinPartIndex);
+        }
+        return true;
+      }
+      return false;
+    }
+
+    private void setNextPositionToLoad(int joinPartIndex)
+    {
+      state = State.NEEDS_DATA;
+      nextPositionToLoad = joinPartIndex;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      Closer closer = Closer.create();
+      closer.registerAll(joinParts);
+      closer.close();
+    }
+  }
+
+  private enum State
+  {
+    NEEDS_DATA,
+    COMPLETE,
+    PAUSED,
+    READY
+  }
+
+  private static class JoinPart implements Closeable
+  {
+    private final Operator op;
+    private final List<String> joinFields;
+    private final List<String> projectFields;
+    private final AtomicBoolean complete;
+
+    private RowsAndColumns curr;
+    private SortedMatrixMaker.SortedMatrix currMatrix;
+    private int currRowIndex;
+    private int scanToRowIndex;
+
+    private Closeable continuation;
+
+    private JoinPart(
+        Operator op,
+        List<String> joinFields,
+        List<String> projectFields
+    )
+    {
+      this.op = op;
+      this.joinFields = joinFields;
+      this.projectFields = projectFields;
+
+      complete = new AtomicBoolean(false);
+      reinitCurr();
+      continuation = null;
+    }
+
+    public void setCurr(RowsAndColumns rac)
+    {
+      //noinspection VariableNotUsedInsideIf
+      if (curr != null) {
+        throw new ISE("Asked to setCurr even though it was not null!?");
+      }
+
+      curr = rac;
+      currMatrix = SortedMatrixMaker.fromRAC(rac).make(joinFields);
+      jumpTo(0);
+    }
+
+    public boolean needsData()
+    {
+      return curr == null || currRowIndex >= currMatrix.numRows();
+    }
+
+    public boolean isComplete()
+    {
+      return complete.get();
+    }
+
+    public void goOrContinue(Receiver receiver)
+    {
+      Closeable theContinuation = continuation;
+      continuation = null;
+      continuation = op.goOrContinue(theContinuation, receiver);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      if (continuation != null) {
+        continuation.close();
+      }
+    }
+
+    /**
+     * @param rowIndex the rowIndex to jump to
+     */
+    public void jumpTo(int rowIndex)
+    {
+      currRowIndex = rowIndex;
+      scanToRowIndex = -1;
+    }
+
+    public void reinitCurr()
+    {
+      curr = null;
+      currMatrix = null;
+      currRowIndex = -1;
+      scanToRowIndex = -1;
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorFactory.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowOperatorFactory.java
similarity index 89%
rename from 
processing/src/main/java/org/apache/druid/query/operator/WindowOperatorFactory.java
rename to 
processing/src/main/java/org/apache/druid/query/operator/window/WindowOperatorFactory.java
index bc4cd5206c..ea44001d50 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowOperatorFactory.java
@@ -17,12 +17,14 @@
  * under the License.
  */
 
-package org.apache.druid.query.operator;
+package org.apache.druid.query.operator.window;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.query.operator.window.Processor;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowProcessorOperator;
 
 public class WindowOperatorFactory implements OperatorFactory
 {
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
index e320ac1999..6ce7769ac9 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
@@ -22,14 +22,24 @@ package org.apache.druid.query.rowsandcols;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
+import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
 import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
+import org.apache.druid.segment.column.ColumnType;
 
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
 public class MapOfColumnsRowsAndColumns implements RowsAndColumns
 {
+  public static MapOfColumnsRowsAndColumns.Builder builder()
+  {
+    return new Builder();
+  }
+
   public static MapOfColumnsRowsAndColumns of(String name, Column col)
   {
     return fromMap(ImmutableMap.of(name, col));
@@ -109,4 +119,40 @@ public class MapOfColumnsRowsAndColumns implements 
RowsAndColumns
     return null;
   }
 
+  public static class Builder
+  {
+    public LinkedHashMap<String, Column> cols = new LinkedHashMap<>();
+
+    public Builder add(String name, int[] vals)
+    {
+      return add(name, new IntArrayColumn(vals));
+    }
+
+    public Builder add(String name, double[] vals)
+    {
+      return add(name, new DoubleArrayColumn(vals));
+    }
+
+    public Builder add(String name, ColumnType type, Object... vals)
+    {
+      return add(name, vals, type);
+    }
+
+    public Builder add(String name, Object[] vals, ColumnType type)
+    {
+      return add(name, new ObjectArrayColumn(vals, type));
+    }
+
+    public Builder add(String name, Column col)
+    {
+      cols.put(name, col);
+      return this;
+    }
+
+    public MapOfColumnsRowsAndColumns build()
+    {
+      return MapOfColumnsRowsAndColumns.fromMap(cols);
+    }
+  }
+
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
index 3476bab60c..f1793f8fd0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
@@ -49,17 +49,30 @@ public class RearrangedRowsAndColumns implements 
RowsAndColumns
 
   private final int[] pointers;
   private final RowsAndColumns rac;
+  private final int start;
+  private final int end;
 
   public RearrangedRowsAndColumns(
       int[] pointers,
       RowsAndColumns rac
   )
   {
-    if (pointers.length != rac.numRows()) {
-      throw new IAE("length mismatch, pointers[%,d], rac[%,d]", 
pointers.length, rac.numRows());
-    }
+    this(pointers, 0, pointers.length, rac);
+  }
 
+  public RearrangedRowsAndColumns(
+      int[] pointers,
+      int start,
+      int end,
+      RowsAndColumns rac
+  )
+  {
+    if (end - start < 0 || end > pointers.length) {
+      throw new IAE("end[%,d] - start[%,d] was invalid!? 
pointers.length[%,d]", end, start, pointers.length);
+    }
     this.pointers = pointers;
+    this.start = start;
+    this.end = end;
     this.rac = rac;
   }
 
@@ -72,13 +85,14 @@ public class RearrangedRowsAndColumns implements 
RowsAndColumns
   @Override
   public int numRows()
   {
-    return pointers.length;
+    return end - start;
   }
 
   @Override
   @Nullable
   public Column findColumn(String name)
   {
+    // We do a containsKey here so that we can negative-cache nulls.
     if (columnCache.containsKey(name)) {
       return columnCache.get(name);
     } else {
@@ -101,50 +115,50 @@ public class RearrangedRowsAndColumns implements 
RowsAndColumns
             @Override
             public int numRows()
             {
-              return pointers.length;
+              return end - start;
             }
 
             @Override
             public boolean isNull(int rowNum)
             {
-              return accessor.isNull(pointers[rowNum]);
+              return accessor.isNull(pointers[start + rowNum]);
             }
 
             @Nullable
             @Override
             public Object getObject(int rowNum)
             {
-              return accessor.getObject(pointers[rowNum]);
+              return accessor.getObject(pointers[start + rowNum]);
             }
 
             @Override
             public double getDouble(int rowNum)
             {
-              return accessor.getDouble(pointers[rowNum]);
+              return accessor.getDouble(pointers[start + rowNum]);
             }
 
             @Override
             public float getFloat(int rowNum)
             {
-              return accessor.getFloat(pointers[rowNum]);
+              return accessor.getFloat(pointers[start + rowNum]);
             }
 
             @Override
             public long getLong(int rowNum)
             {
-              return accessor.getLong(pointers[rowNum]);
+              return accessor.getLong(pointers[start + rowNum]);
             }
 
             @Override
             public int getInt(int rowNum)
             {
-              return accessor.getInt(pointers[rowNum]);
+              return accessor.getInt(pointers[start + rowNum]);
             }
 
             @Override
             public int compareRows(int lhsRowNum, int rhsRowNum)
             {
-              return accessor.compareRows(pointers[lhsRowNum], 
pointers[rhsRowNum]);
+              return accessor.compareRows(pointers[lhsRowNum], pointers[start 
+ rhsRowNum]);
             }
           }
       );
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
new file mode 100644
index 0000000000..4eddcc77f1
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.column;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+public interface BinarySearchableAccessor extends ColumnAccessor
+{
+  static BinarySearchableAccessor fromColumn(Column col)
+  {
+    final BinarySearchableAccessor retVal = 
col.as(BinarySearchableAccessor.class);
+
+    if (retVal == null) {
+      final ColumnAccessor accessor = col.toAccessor();
+      if (accessor instanceof BinarySearchableAccessor) {
+        // Why didn't they just return it from the as()!?!?!  Who knows, ah 
well.
+        return (BinarySearchableAccessor) accessor;
+      }
+
+      throw new ISE("col[%s] is a no-go", col.getClass());
+    }
+    return retVal;
+  }
+
+  FindResult findNull(int startIndex, int endIndex);
+
+  FindResult findDouble(int startIndex, int endIndex, double val);
+
+  FindResult findFloat(int startIndex, int endIndex, float val);
+
+  FindResult findLong(int startIndex, int endIndex, long val);
+
+  FindResult findString(int startIndex, int endIndex, String val);
+
+  FindResult findComplex(int startIndex, int endIndex, Object val);
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
index 9f5959f185..4e84c398a8 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
@@ -24,7 +24,7 @@ import org.apache.druid.segment.column.ColumnType;
 import javax.annotation.Nullable;
 
 /**
- * Allows for accessing a column, provides methods to enable cell-by-cell 
access.
+ * Allows for accessing a column, provides methods to enable row-by-row access 
of a specific column.
  */
 public interface ColumnAccessor
 {
@@ -36,69 +36,69 @@ public interface ColumnAccessor
   ColumnType getType();
 
   /**
-   * Get the number of cells
+   * Get the number of rows
    *
-   * @return the number of cells
+   * @return the number of rows
    */
   int numRows();
 
   /**
-   * Get whether the value of a cell is null
+   * Get whether the value of a row is null
    *
-   * @param rowNum the cell id, 0-indexed
+   * @param rowNum the row id, 0-indexed
    * @return true if the value is null
    */
   boolean isNull(int rowNum);
 
   /**
-   * Get the {@link Object} representation of the cell.
+   * Get the {@link Object} representation of the row.
    *
-   * @param rowNum the cell id, 0-indexed
-   * @return the {@link Object} representation of the cell.  Returns {@code 
null} If {@link #isNull} is true.
+   * @param rowNum the row id, 0-indexed
+   * @return the {@link Object} representation of the row.  Returns {@code 
null} If {@link #isNull} is true.
    */
   @Nullable
   Object getObject(int rowNum);
 
   /**
-   * Get the primitive {@code double} representation of the cell.
+   * Get the primitive {@code double} representation of the row.
    *
-   * @param rowNum the cell id, 0-indexed
-   * @return the primitive {@code double} representation of the cell.  Returns 
{@code 0D} If {@link #isNull} is true.
+   * @param rowNum the row id, 0-indexed
+   * @return the primitive {@code double} representation of the row.  Returns 
{@code 0D} If {@link #isNull} is true.
    */
   double getDouble(int rowNum);
 
   /**
-   * Get the primitive {@code float} representation of the cell.
+   * Get the primitive {@code float} representation of the row.
    *
-   * @param rowNum the cell id, 0-indexed
-   * @return the primitive {@code float} representation of the cell.  Returns 
{@code 0F} If {@link #isNull} is true.
+   * @param rowNum the row id, 0-indexed
+   * @return the primitive {@code float} representation of the row.  Returns 
{@code 0F} If {@link #isNull} is true.
    */
   float getFloat(int rowNum);
 
   /**
-   * Get the primitive {@code long} representation of the cell.
+   * Get the primitive {@code long} representation of the row.
    *
-   * @param rowNum the cell id, 0-indexed
-   * @return the primitive {@code long} representation of the cell.  Returns 
{@code 0L} If {@link #isNull} is true.
+   * @param rowNum the row id, 0-indexed
+   * @return the primitive {@code long} representation of the row.  Returns 
{@code 0L} If {@link #isNull} is true.
    */
   long getLong(int rowNum);
 
   /**
-   * Get the primitive {@code int} representation of the cell.
+   * Get the primitive {@code int} representation of the row.
    *
-   * @param rowNum the cell id, 0-indexed
-   * @return the primitive {@code int} representation of the cell.  Returns 
{@code 0} If {@link #isNull} is true.
+   * @param rowNum the row id, 0-indexed
+   * @return the primitive {@code int} representation of the row.  Returns 
{@code 0} If {@link #isNull} is true.
    */
   int getInt(int rowNum);
 
   /**
-   * Compares two cells using a comparison that follows the same semantics as 
{@link java.util.Comparator#compare}
+   * Compares two rows using a comparison that follows the same semantics as 
{@link java.util.Comparator#compare}
    * <p>
-   * This is not comparing the cell Ids, but the values referred to by the 
cell ids.
+   * This is not comparing the row Ids, but the values referred to by the row 
ids.
    *
-   * @param lhsRowNum the cell id of the left-hand-side of the comparison
-   * @param rhsRowNum the cell id of the right-hand-side of the comparison
-   * @return the result of the comparison of the two cells
+   * @param lhsRowNum the row id of the left-hand-side of the comparison
+   * @param rhsRowNum the row id of the right-hand-side of the comparison
+   * @return the result of the comparison of the two rows
    */
   int compareRows(int lhsRowNum, int rhsRowNum);
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
index d054472f3d..28a7c3dd10 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.rowsandcols.column;
 
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.util.FindResult;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nonnull;
@@ -43,62 +44,7 @@ public class ConstantObjectColumn implements Column
   @Override
   public ColumnAccessor toAccessor()
   {
-    return new ColumnAccessor()
-    {
-      @Override
-      public ColumnType getType()
-      {
-        return type;
-      }
-
-      @Override
-      public int numRows()
-      {
-        return numRows;
-      }
-
-      @Override
-      public boolean isNull(int rowNum)
-      {
-        return obj == null;
-      }
-
-      @Override
-      public Object getObject(int rowNum)
-      {
-        return obj;
-      }
-
-      @Override
-      public double getDouble(int rowNum)
-      {
-        return ((Number) obj).doubleValue();
-      }
-
-      @Override
-      public float getFloat(int rowNum)
-      {
-        return ((Number) obj).floatValue();
-      }
-
-      @Override
-      public long getLong(int rowNum)
-      {
-        return ((Number) obj).longValue();
-      }
-
-      @Override
-      public int getInt(int rowNum)
-      {
-        return ((Number) obj).intValue();
-      }
-
-      @Override
-      public int compareRows(int lhsRowNum, int rhsRowNum)
-      {
-        return 0;
-      }
-    };
+    return new ConstantColumnAccessor();
   }
 
   @Nullable
@@ -121,4 +67,104 @@ public class ConstantObjectColumn implements Column
 
     return null;
   }
+
+  private class ConstantColumnAccessor implements BinarySearchableAccessor
+  {
+    @Override
+    public ColumnType getType()
+    {
+      return type;
+    }
+
+    @Override
+    public int numRows()
+    {
+      return numRows;
+    }
+
+    @Override
+    public boolean isNull(int rowNum)
+    {
+      return obj == null;
+    }
+
+    @Override
+    public Object getObject(int rowNum)
+    {
+      return obj;
+    }
+
+    @Override
+    public double getDouble(int rowNum)
+    {
+      return ((Number) obj).doubleValue();
+    }
+
+    @Override
+    public float getFloat(int rowNum)
+    {
+      return ((Number) obj).floatValue();
+    }
+
+    @Override
+    public long getLong(int rowNum)
+    {
+      return ((Number) obj).longValue();
+    }
+
+    @Override
+    public int getInt(int rowNum)
+    {
+      return ((Number) obj).intValue();
+    }
+
+    @Override
+    public int compareRows(int lhsRowNum, int rhsRowNum)
+    {
+      return 0;
+    }
+
+    @Override
+    public FindResult findNull(int startIndex, int endIndex)
+    {
+      return findComplex(startIndex, endIndex, null);
+    }
+
+    @Override
+    public FindResult findDouble(int startIndex, int endIndex, double val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findFloat(int startIndex, int endIndex, float val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findLong(int startIndex, int endIndex, long val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findString(int startIndex, int endIndex, String val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findComplex(int startIndex, int endIndex, Object val)
+    {
+      final boolean same;
+      if (obj == null) {
+        same = val == null;
+      } else {
+        same = obj.equals(val);
+      }
+
+      return same ? FindResult.found(startIndex, endIndex) : 
FindResult.notFound(endIndex);
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
index c87e69f500..9c3b799d30 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
@@ -20,10 +20,13 @@
 package org.apache.druid.query.rowsandcols.column;
 
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.rowsandcols.util.FindResult;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Arrays;
 
 public class DoubleArrayColumn implements Column
 {
@@ -40,62 +43,7 @@ public class DoubleArrayColumn implements Column
   @Override
   public ColumnAccessor toAccessor()
   {
-    return new ColumnAccessor()
-    {
-      @Override
-      public ColumnType getType()
-      {
-        return ColumnType.DOUBLE;
-      }
-
-      @Override
-      public int numRows()
-      {
-        return vals.length;
-      }
-
-      @Override
-      public boolean isNull(int rowNum)
-      {
-        return false;
-      }
-
-      @Override
-      public Object getObject(int rowNum)
-      {
-        return vals[rowNum];
-      }
-
-      @Override
-      public double getDouble(int rowNum)
-      {
-        return vals[rowNum];
-      }
-
-      @Override
-      public float getFloat(int rowNum)
-      {
-        return (float) vals[rowNum];
-      }
-
-      @Override
-      public long getLong(int rowNum)
-      {
-        return (long) vals[rowNum];
-      }
-
-      @Override
-      public int getInt(int rowNum)
-      {
-        return (int) vals[rowNum];
-      }
-
-      @Override
-      public int compareRows(int lhsRowNum, int rhsRowNum)
-      {
-        return Double.compare(lhsRowNum, rhsRowNum);
-      }
-    };
+    return new MyColumnAccessor();
   }
 
   @Nullable
@@ -126,4 +74,122 @@ public class DoubleArrayColumn implements Column
     }
     return null;
   }
+
+  private class MyColumnAccessor implements BinarySearchableAccessor
+  {
+    @Override
+    public ColumnType getType()
+    {
+      return ColumnType.DOUBLE;
+    }
+
+    @Override
+    public int numRows()
+    {
+      return vals.length;
+    }
+
+    @Override
+    public boolean isNull(int rowNum)
+    {
+      return false;
+    }
+
+    @Override
+    public Object getObject(int rowNum)
+    {
+      return vals[rowNum];
+    }
+
+    @Override
+    public double getDouble(int rowNum)
+    {
+      return vals[rowNum];
+    }
+
+    @Override
+    public float getFloat(int rowNum)
+    {
+      return (float) vals[rowNum];
+    }
+
+    @Override
+    public long getLong(int rowNum)
+    {
+      return (long) vals[rowNum];
+    }
+
+    @Override
+    public int getInt(int rowNum)
+    {
+      return (int) vals[rowNum];
+    }
+
+    @Override
+    public int compareRows(int lhsRowNum, int rhsRowNum)
+    {
+      return Double.compare(lhsRowNum, rhsRowNum);
+    }
+
+    @Override
+    public FindResult findNull(int startIndex, int endIndex)
+    {
+      return FindResult.notFound(endIndex);
+    }
+
+    @Override
+    public FindResult findDouble(int startIndex, int endIndex, double val)
+    {
+      if (vals[startIndex] == val) {
+        int end = startIndex + 1;
+
+        while (end < endIndex && vals[end] == val) {
+          ++end;
+        }
+        return FindResult.found(startIndex, end);
+      }
+
+      int i = Arrays.binarySearch(vals, startIndex, endIndex, val);
+      if (i > 0) {
+        int foundStart = i;
+        int foundEnd = i + 1;
+
+        while (foundStart - 1 >= startIndex && vals[foundStart - 1] == val) {
+          --foundStart;
+        }
+
+        while (foundEnd < endIndex && vals[foundEnd] == val) {
+          ++foundEnd;
+        }
+
+        return FindResult.found(foundStart, foundEnd);
+      } else {
+        return FindResult.notFound(-(i + 1));
+      }
+    }
+
+    @Override
+    public FindResult findFloat(int startIndex, int endIndex, float val)
+    {
+      return findDouble(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findLong(int startIndex, int endIndex, long val)
+    {
+      return findDouble(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findString(int startIndex, int endIndex, String val)
+    {
+      return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
+    }
+
+    @Override
+    public FindResult findComplex(int startIndex, int endIndex, Object val)
+    {
+      return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
index ee5a7d7604..673cebf0e2 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
@@ -20,10 +20,13 @@
 package org.apache.druid.query.rowsandcols.column;
 
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.rowsandcols.util.FindResult;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Arrays;
 
 public class IntArrayColumn implements Column
 {
@@ -40,62 +43,7 @@ public class IntArrayColumn implements Column
   @Override
   public ColumnAccessor toAccessor()
   {
-    return new ColumnAccessor()
-    {
-      @Override
-      public ColumnType getType()
-      {
-        return ColumnType.LONG;
-      }
-
-      @Override
-      public int numRows()
-      {
-        return vals.length;
-      }
-
-      @Override
-      public boolean isNull(int rowNum)
-      {
-        return false;
-      }
-
-      @Override
-      public Object getObject(int rowNum)
-      {
-        return vals[rowNum];
-      }
-
-      @Override
-      public double getDouble(int rowNum)
-      {
-        return vals[rowNum];
-      }
-
-      @Override
-      public float getFloat(int rowNum)
-      {
-        return vals[rowNum];
-      }
-
-      @Override
-      public long getLong(int rowNum)
-      {
-        return vals[rowNum];
-      }
-
-      @Override
-      public int getInt(int rowNum)
-      {
-        return vals[rowNum];
-      }
-
-      @Override
-      public int compareRows(int lhsRowNum, int rhsRowNum)
-      {
-        return Integer.compare(vals[lhsRowNum], vals[rhsRowNum]);
-      }
-    };
+    return new MyColumnAccessor();
   }
 
   @Nullable
@@ -126,4 +74,128 @@ public class IntArrayColumn implements Column
     }
     return null;
   }
+
+  private class MyColumnAccessor implements BinarySearchableAccessor
+  {
+    @Override
+    public ColumnType getType()
+    {
+      return ColumnType.LONG;
+    }
+
+    @Override
+    public int numRows()
+    {
+      return vals.length;
+    }
+
+    @Override
+    public boolean isNull(int rowNum)
+    {
+      return false;
+    }
+
+    @Override
+    public Object getObject(int rowNum)
+    {
+      return vals[rowNum];
+    }
+
+    @Override
+    public double getDouble(int rowNum)
+    {
+      return vals[rowNum];
+    }
+
+    @Override
+    public float getFloat(int rowNum)
+    {
+      return vals[rowNum];
+    }
+
+    @Override
+    public long getLong(int rowNum)
+    {
+      return vals[rowNum];
+    }
+
+    @Override
+    public int getInt(int rowNum)
+    {
+      return vals[rowNum];
+    }
+
+    @Override
+    public int compareRows(int lhsRowNum, int rhsRowNum)
+    {
+      return Integer.compare(vals[lhsRowNum], vals[rhsRowNum]);
+    }
+
+
+    @Override
+    public FindResult findNull(int startIndex, int endIndex)
+    {
+      return FindResult.notFound(endIndex);
+    }
+
+    @Override
+    public FindResult findDouble(int startIndex, int endIndex, double val)
+    {
+      return findInt(startIndex, endIndex, (int) val);
+    }
+
+    @Override
+    public FindResult findFloat(int startIndex, int endIndex, float val)
+    {
+      return findInt(startIndex, endIndex, (int) val);
+    }
+
+    @Override
+    public FindResult findLong(int startIndex, int endIndex, long val)
+    {
+      return findInt(startIndex, endIndex, (int) val);
+    }
+
+    public FindResult findInt(int startIndex, int endIndex, int val)
+    {
+      if (vals[startIndex] == val) {
+        int end = startIndex + 1;
+
+        while (end < endIndex && vals[end] == val) {
+          ++end;
+        }
+        return FindResult.found(startIndex, end);
+      }
+
+      int i = Arrays.binarySearch(vals, startIndex, endIndex, val);
+      if (i > 0) {
+        int foundStart = i;
+        int foundEnd = i + 1;
+
+        while (foundStart - 1 >= startIndex && vals[foundStart - 1] == val) {
+          --foundStart;
+        }
+
+        while (foundEnd < endIndex && vals[foundEnd] == val) {
+          ++foundEnd;
+        }
+
+        return FindResult.found(foundStart, foundEnd);
+      } else {
+        return FindResult.notFound(-(i + 1));
+      }
+    }
+
+    @Override
+    public FindResult findString(int startIndex, int endIndex, String val)
+    {
+      return findInt(startIndex, endIndex, (int) Numbers.tryParseLong(val, 0));
+    }
+
+    @Override
+    public FindResult findComplex(int startIndex, int endIndex, Object val)
+    {
+      return findDouble(startIndex, endIndex, (int) Numbers.tryParseLong(val, 
0));
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
index 73823534be..36a5046c1f 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.rowsandcols.column;
 
+import org.apache.druid.query.rowsandcols.util.FindResult;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nonnull;
@@ -52,7 +53,7 @@ public class NullColumn implements Column
     return null;
   }
 
-  public static class Accessor implements ColumnAccessor
+  public static class Accessor implements BinarySearchableAccessor
   {
     private final ColumnType type;
     private final int size;
@@ -117,5 +118,41 @@ public class NullColumn implements Column
     {
       return 0;
     }
+
+    @Override
+    public FindResult findNull(int startIndex, int endIndex)
+    {
+      return FindResult.found(startIndex, endIndex);
+    }
+
+    @Override
+    public FindResult findDouble(int startIndex, int endIndex, double val)
+    {
+      return FindResult.notFound(endIndex);
+    }
+
+    @Override
+    public FindResult findFloat(int startIndex, int endIndex, float val)
+    {
+      return FindResult.notFound(endIndex);
+    }
+
+    @Override
+    public FindResult findLong(int startIndex, int endIndex, long val)
+    {
+      return FindResult.notFound(endIndex);
+    }
+
+    @Override
+    public FindResult findString(int startIndex, int endIndex, String val)
+    {
+      return FindResult.notFound(endIndex);
+    }
+
+    @Override
+    public FindResult findComplex(int startIndex, int endIndex, Object val)
+    {
+      return FindResult.notFound(endIndex);
+    }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
index 46d875286a..211c176902 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
@@ -19,10 +19,12 @@
 
 package org.apache.druid.query.rowsandcols.column;
 
+import org.apache.druid.query.rowsandcols.util.FindResult;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Arrays;
 import java.util.Comparator;
 
 public class ObjectArrayColumn implements Column
@@ -59,32 +61,7 @@ public class ObjectArrayColumn implements Column
   @Override
   public ColumnAccessor toAccessor()
   {
-    return new ObjectColumnAccessorBase()
-    {
-      @Override
-      protected Object getVal(int rowNum)
-      {
-        return objects[rowNum];
-      }
-
-      @Override
-      protected Comparator<Object> getComparator()
-      {
-        return comparator;
-      }
-
-      @Override
-      public ColumnType getType()
-      {
-        return resultType;
-      }
-
-      @Override
-      public int numRows()
-      {
-        return objects.length;
-      }
-    };
+    return new MyColumnAccessor();
   }
 
   @Nullable
@@ -105,4 +82,91 @@ public class ObjectArrayColumn implements Column
     return null;
   }
 
+  private class MyColumnAccessor extends ObjectColumnAccessorBase implements 
BinarySearchableAccessor
+  {
+    @Override
+    protected Object getVal(int rowNum)
+    {
+      return objects[rowNum];
+    }
+
+    @Override
+    protected Comparator<Object> getComparator()
+    {
+      return comparator;
+    }
+
+    @Override
+    public ColumnType getType()
+    {
+      return resultType;
+    }
+
+    @Override
+    public int numRows()
+    {
+      return objects.length;
+    }
+
+    @Override
+    public FindResult findNull(int startIndex, int endIndex)
+    {
+      return findComplex(startIndex, endIndex, null);
+    }
+
+    @Override
+    public FindResult findDouble(int startIndex, int endIndex, double val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findFloat(int startIndex, int endIndex, float val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findLong(int startIndex, int endIndex, long val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findString(int startIndex, int endIndex, String val)
+    {
+      return findComplex(startIndex, endIndex, val);
+    }
+
+    @Override
+    public FindResult findComplex(int startIndex, int endIndex, Object val)
+    {
+      if (comparator.compare(objects[startIndex], val) == 0) {
+        int end = startIndex + 1;
+
+        while (end < endIndex && comparator.compare(objects[end], val) == 0) {
+          ++end;
+        }
+        return FindResult.found(startIndex, end);
+      }
+
+      int i = Arrays.binarySearch(objects, startIndex, endIndex, val, 
comparator);
+      if (i > 0) {
+        int foundStart = i;
+        int foundEnd = i + 1;
+
+        while (foundStart - 1 >= startIndex && 
comparator.compare(objects[foundStart - 1], val) == 0) {
+          --foundStart;
+        }
+
+        while (foundEnd < endIndex && comparator.compare(objects[foundEnd], 
val) == 0) {
+          ++foundEnd;
+        }
+
+        return FindResult.found(foundStart, foundEnd);
+      } else {
+        return FindResult.notFound(-(i + 1));
+      }
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultSortedMatrixMaker.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultSortedMatrixMaker.java
new file mode 100644
index 0000000000..d08b6427bc
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultSortedMatrixMaker.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.BinarySearchableAccessor;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The DefaultSortedMatrixMaker is a SortedMatrixMaker that works on the 
generic RowsAndColumns interface.
+ * <p>
+ * It does not validate that things are sorted, it just assumes that they must 
be.  As such, behavior are undefined
+ * when running on top of a RowsAndColumns that is not actually sorted by the 
columns passed into {@link #make}
+ */
+public class DefaultSortedMatrixMaker implements SortedMatrixMaker
+{
+  private final RowsAndColumns rac;
+
+  public DefaultSortedMatrixMaker(
+      RowsAndColumns rac
+  )
+  {
+    this.rac = rac;
+  }
+
+  @Override
+  public SortedMatrix make(List<String> columns)
+  {
+    ArrayList<BinarySearchableAccessor> soughtColumns = new 
ArrayList<>(columns.size());
+    for (String column : columns) {
+      // Note, this can add `null` to the list.  That's intentional, 
iterations over this list must deal
+      // with null entries meaning that the column doesn't exist
+      final Column racColumn = rac.findColumn(column);
+      if (racColumn == null) {
+        soughtColumns.add(null);
+      } else {
+        soughtColumns.add(BinarySearchableAccessor.fromColumn(racColumn));
+      }
+    }
+
+    return new SortedMatrix()
+    {
+      @Override
+      public int numRows()
+      {
+        return rac.numRows();
+      }
+
+      @Override
+      public MatrixRow getRow(int rowId)
+      {
+        return new MatrixRow()
+        {
+          @Override
+          public int length()
+          {
+            return soughtColumns.size();
+          }
+
+          @Override
+          public boolean isNull(int columnId)
+          {
+            final BinarySearchableAccessor column = 
soughtColumns.get(columnId);
+            if (column == null) {
+              return true;
+            } else {
+              return column.isNull(rowId);
+            }
+          }
+
+          @Override
+          public Object getObject(int columnId)
+          {
+            final BinarySearchableAccessor column = 
soughtColumns.get(columnId);
+            if (column == null) {
+              return null;
+            } else {
+              return column.getObject(rowId);
+            }
+          }
+
+          @Override
+          public double getDouble(int columnId)
+          {
+            final BinarySearchableAccessor column = 
soughtColumns.get(columnId);
+            if (column == null) {
+              return 0;
+            } else {
+              return column.getDouble(rowId);
+            }
+          }
+
+          @Override
+          public float getFloat(int columnId)
+          {
+            final BinarySearchableAccessor column = 
soughtColumns.get(columnId);
+            if (column == null) {
+              return 0;
+            } else {
+              return column.getFloat(rowId);
+            }
+          }
+
+          @Override
+          public long getLong(int columnId)
+          {
+            final BinarySearchableAccessor column = 
soughtColumns.get(columnId);
+            if (column == null) {
+              return 0;
+            } else {
+              return column.getLong(rowId);
+            }
+          }
+        };
+      }
+
+      @Override
+      public FindResult findRow(
+          int startRowIndex,
+          MatrixRow row
+      )
+      {
+        int start = startRowIndex;
+        int end = numRows();
+
+        for (int i = 0; i < row.length(); ++i) {
+          final BinarySearchableAccessor searcher = soughtColumns.get(i);
+          if (searcher == null) {
+            if (row.isNull(i)) {
+              continue;
+            } else {
+              // We don't have the column at all, that means that we can only 
match `null`.  Given that the data
+              // is sorted, the next possible value is the next value of the 
previous column, so return that as our
+              // end.
+              return FindResult.notFound(end);
+            }
+          }
+
+          final FindResult result;
+          if (row.isNull(i)) {
+            result = searcher.findNull(start, end);
+          } else {
+            switch (searcher.getType().getType()) {
+              case STRING:
+                result = searcher.findString(start, end, (String) 
row.getObject(i));
+                break;
+              case LONG:
+                if (row.isNull(i)) {
+                  result = searcher.findNull(start, end);
+                } else {
+                  result = searcher.findLong(start, end, row.getLong(i));
+                }
+                break;
+              case DOUBLE:
+                result = searcher.findDouble(start, end, row.getDouble(i));
+                break;
+              case FLOAT:
+                result = searcher.findFloat(start, end, row.getFloat(i));
+                break;
+
+              case ARRAY:
+              case COMPLEX:
+                result = searcher.findComplex(start, end, row.getObject(i));
+                break;
+              default:
+                throw new RE("Unknown type[%s]", searcher.getType());
+            }
+          }
+
+          if (result.wasFound()) {
+            start = result.getStartRow();
+            end = result.getEndRow();
+          } else {
+            return FindResult.notFound(result.getNext());
+          }
+        }
+
+        return FindResult.found(start, end);
+      }
+    };
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/SortedMatrixMaker.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/SortedMatrixMaker.java
new file mode 100644
index 0000000000..2e610b12ee
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/SortedMatrixMaker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * A thing that makes Matrixs in an assumed sorted-fashion.
+ *
+ * This SemanticInterface was created in support of the 
SortedInnerJoinOperator, which was created as an attempt
+ * to ensure that the pausable Operators actually enable us to merge 
independent streams of data.  As such, thought
+ * has gone into it to the extent that it was necessary to make something that 
works, it's entirely possible that
+ * it's a pointless interface and should be re-evaluated when/if the join 
implementation is dusted back off
+ */
+public interface SortedMatrixMaker
+{
+  static SortedMatrixMaker fromRAC(RowsAndColumns rac)
+  {
+    SortedMatrixMaker retVal = rac.as(SortedMatrixMaker.class);
+    if (retVal == null) {
+      return new DefaultSortedMatrixMaker(rac);
+    }
+    return retVal;
+  }
+
+  SortedMatrix make(List<String> columns);
+
+  /**
+   * A matrix thingie
+   */
+  interface SortedMatrix
+  {
+    int numRows();
+
+    MatrixRow getRow(int rowId);
+
+    /**
+     * @param startRowIndex the rowIndex to start the search from.  The return 
value should never be less than this
+     *                      value
+     * @param row           the row to compare against
+     * @return a FindResult null if exists beyond the bounds of the current 
Matrix
+     */
+    FindResult findRow(int startRowIndex, MatrixRow row);
+
+    interface MatrixRow
+    {
+      int length();
+
+      boolean isNull(int columnId);
+
+      @Nullable
+      Object getObject(int columnId);
+
+      double getDouble(int columnId);
+
+      float getFloat(int columnId);
+
+      long getLong(int columnId);
+    }
+  }
+
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/util/FindResult.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/util/FindResult.java
new file mode 100644
index 0000000000..248734a18f
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/util/FindResult.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.util;
+
+public class FindResult
+{
+  public static FindResult found(int startRow, int endRow)
+  {
+    return new FindResult(startRow, endRow, true);
+  }
+
+  public static FindResult notFound(int nextRow)
+  {
+    return new FindResult(nextRow, -1, false);
+  }
+
+  private final int startRow;
+  private final int endRow;
+  private final boolean found;
+
+  private FindResult(
+      int startRow,
+      int endRow,
+      boolean found
+  )
+  {
+    this.startRow = startRow;
+    this.endRow = endRow;
+    this.found = found;
+  }
+
+  public boolean wasFound()
+  {
+    return found;
+  }
+
+  public int getStartRow()
+  {
+    return startRow;
+  }
+
+  public int getEndRow()
+  {
+    return endRow;
+  }
+
+  public int getNext()
+  {
+    // We overload the startRow to represent the next meaningful row if the 
value wasn't actually found.
+    return startRow;
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java 
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
index f482cf04f4..edd759a475 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.Closeable;
 
@@ -41,5 +42,5 @@ public interface CloseableShapeshifter extends Closeable
    * through a local implementation of the interface.
    */
   @Nullable
-  <T> T as(Class<T> clazz);
+  <T> T as(@Nonnull Class<T> clazz);
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
 
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
index b4c52e1950..cd515deadb 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
@@ -24,7 +24,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
 public class ExceptionalReceiver implements Operator.Receiver
 {
   @Override
-  public boolean push(RowsAndColumns rac)
+  public Operator.Signal push(RowsAndColumns rac)
   {
     throw new UnsupportedOperationException();
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
 
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
index 1d7b68887b..ac9e9e1aa3 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
@@ -20,41 +20,70 @@
 package org.apache.druid.query.operator;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
+import java.io.Closeable;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
 public class InlineScanOperator implements Operator
 {
-  public static InlineScanOperator make(RowsAndColumns item)
+  public static InlineScanOperator make(RowsAndColumns... item)
   {
-    return new InlineScanOperator(Iterators.singletonIterator(item));
+    return new InlineScanOperator(Arrays.asList(item));
   }
 
   public static InlineScanOperator make(List<RowsAndColumns> items)
   {
-    return new InlineScanOperator(items.iterator());
+    return new InlineScanOperator(items);
   }
 
-  private Iterator<RowsAndColumns> iter;
+  private Iterable<RowsAndColumns> iterable;
 
   public InlineScanOperator(
-      Iterator<RowsAndColumns> iter
+      Iterable<RowsAndColumns> iterable
   )
   {
-    Preconditions.checkNotNull(iter);
-    this.iter = iter;
+    Preconditions.checkNotNull(iterable);
+    this.iterable = iterable;
   }
 
   @Override
-  public void go(Receiver receiver)
+  public Closeable goOrContinue(Closeable continuation, Receiver receiver)
   {
-    boolean keepItGoing = true;
-    while (keepItGoing && iter.hasNext()) {
+    final Iterator<RowsAndColumns> iter;
+    if (continuation == null) {
+      iter = iterable.iterator();
+    } else {
+      iter = ((Continuation) continuation).iter;
+    }
+
+    Signal keepItGoing = Signal.GO;
+    while (keepItGoing == Signal.GO && iter.hasNext()) {
       keepItGoing = receiver.push(iter.next());
     }
-    receiver.completed();
+    if (keepItGoing == Signal.PAUSE && iter.hasNext()) {
+      return new Continuation(iter);
+    } else {
+      receiver.completed();
+      return null;
+    }
+  }
+
+  private static class Continuation implements Closeable
+  {
+    private final Iterator<RowsAndColumns> iter;
+
+    public Continuation(Iterator<RowsAndColumns> iter)
+    {
+      this.iter = iter;
+    }
+
+    @Override
+    public void close()
+    {
+      // We don't actually have anything to close
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
index 875ef52bd3..ffea250f6a 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
@@ -24,14 +24,16 @@ import 
org.apache.druid.java.util.common.guava.YieldingAccumulator;
 import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
 import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.junit.Assert;
 import org.junit.Test;
 
+@SuppressWarnings("ConstantConditions")
 public class OperatorSequenceTest
 {
   @Test
-  public void testAccumulateButNoYielder()
+  public void testAccumulateAndYielderJustOne()
   {
     OperatorSequence seq = new OperatorSequence(
         () -> InlineScanOperator.make(MapOfColumnsRowsAndColumns.of("hi", new 
IntArrayColumn(new int[]{1})))
@@ -52,33 +54,114 @@ public class OperatorSequenceTest
         ).intValue()
     );
 
-    boolean exceptionThrown = false;
-    try {
-      Yielder<Integer> yielder = seq.toYielder(0, new 
YieldingAccumulator<Integer, RowsAndColumns>()
+    Yielder<Integer> yielder = seq.toYielder(0, new 
YieldingAccumulator<Integer, RowsAndColumns>()
+    {
+      @Override
+      public Integer accumulate(Integer accumulated, RowsAndColumns in)
       {
-        @Override
-        public Integer accumulate(Integer accumulated, RowsAndColumns in)
-        {
-          Assert.fail("This should never be called, because we expect a UOE 
before this point");
+        this.yield();
+        helper.validate(in);
+        return accumulated + 1;
+      }
+    });
+
+    Assert.assertFalse(yielder.isDone());
+    Assert.assertEquals(1, yielder.get().intValue());
+
+    yielder = yielder.next(0);
+    Assert.assertTrue(yielder.isDone());
+  }
+
+  @Test
+  public void testAccumulateAndYielderMultiple()
+  {
+    OperatorSequence seq = new OperatorSequence(
+        () -> InlineScanOperator.make(
+            MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new 
int[]{1})),
+            MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new 
int[]{2})),
+            MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new 
int[]{3, 4})),
+            MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new 
int[]{5, 6, 7, 8})),
+            MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new 
int[]{9, 10, 11})),
+            MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new 
int[]{12, 13, 14, 15}))
+        )
+    );
+
+    Assert.assertEquals(
+        120,
+        seq.accumulate(
+            0,
+            (accumulated, in) -> {
+              final ColumnAccessor col = in.findColumn("hi").toAccessor();
+              for (int i = 0; i < col.numRows(); ++i) {
+                accumulated += col.getInt(i);
+              }
+              return accumulated;
+            }
+        ).intValue()
+    );
+
+    // Never yield
+    Yielder<Integer> yielder = seq.toYielder(0, new 
YieldingAccumulator<Integer, RowsAndColumns>()
+    {
+      @Override
+      public Integer accumulate(Integer accumulated, RowsAndColumns in)
+      {
+        final ColumnAccessor col = in.findColumn("hi").toAccessor();
+        for (int i = 0; i < col.numRows(); ++i) {
+          accumulated += col.getInt(i);
+        }
+        return accumulated;
+      }
+    });
+
+    Assert.assertEquals(120, yielder.get().intValue());
+    Assert.assertTrue(yielder.isDone());
+
+    // Yield at the very end...
+    yielder = seq.toYielder(0, new YieldingAccumulator<Integer, 
RowsAndColumns>()
+    {
+      @Override
+      public Integer accumulate(Integer accumulated, RowsAndColumns in)
+      {
+        final ColumnAccessor col = in.findColumn("hi").toAccessor();
+        for (int i = 0; i < col.numRows(); ++i) {
+          accumulated += col.getInt(i);
+        }
+        if (accumulated == 120) {
           this.yield();
-          helper.validate(in);
-          return accumulated + 1;
         }
-      });
+        return accumulated;
+      }
+    });
 
-      // The exception will have been thrown before this point, in which case 
one might wonder why the code here
-      // remains.  It is because this code is a correct validation of what 
should happen if OperatorSequence *did*
-      // implement the Yielder.  It's kept for posterity in case we ever 
choose to implement it using threads.
-      Assert.assertFalse(yielder.isDone());
-      Assert.assertEquals(1, yielder.get().intValue());
+    Assert.assertEquals(120, yielder.get().intValue());
+    Assert.assertFalse(yielder.isDone());
 
+    yielder = yielder.next(0);
+    Assert.assertTrue(yielder.isDone());
+
+    // Aggregate each RAC and yield.
+    yielder = seq.toYielder(0, new YieldingAccumulator<Integer, 
RowsAndColumns>()
+    {
+      @Override
+      public Integer accumulate(Integer accumulated, RowsAndColumns in)
+      {
+        this.yield();
+        final ColumnAccessor col = in.findColumn("hi").toAccessor();
+        for (int i = 0; i < col.numRows(); ++i) {
+          accumulated += col.getInt(i);
+        }
+        return accumulated;
+      }
+    });
+
+    int[] expectedTotals = new int[]{1, 2, 7, 26, 30, 54};
+
+    for (int expectedTotal : expectedTotals) {
+      Assert.assertEquals(expectedTotal, yielder.get().intValue());
+      Assert.assertFalse(yielder.isDone());
       yielder = yielder.next(0);
-      Assert.assertTrue(yielder.isDone());
-    }
-    catch (UnsupportedOperationException ex) {
-      Assert.assertEquals("Cannot convert an Operator to a Yielder", 
ex.getMessage());
-      exceptionThrown = true;
     }
-    Assert.assertTrue(exceptionThrown);
+    Assert.assertTrue(yielder.isDone());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
index b1bace9aa3..e44bdc5806 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
@@ -42,10 +42,10 @@ public class OperatorTestHelper
           int index = 0;
 
           @Override
-          public boolean push(RowsAndColumns rac)
+          public Operator.Signal push(RowsAndColumns rac)
           {
             helpers[index++].validate(rac);
-            return true;
+            return Operator.Signal.GO;
           }
         }
     ).withFinalValidation(
@@ -61,10 +61,10 @@ public class OperatorTestHelper
           int index = 0;
 
           @Override
-          public boolean push(RowsAndColumns rac)
+          public Operator.Signal push(RowsAndColumns rac)
           {
             helpers[index++].validate(rac);
-            return index < helpers.length;
+            return index < helpers.length ? Operator.Signal.GO : 
Operator.Signal.STOP;
           }
         }
     ).withFinalValidation(
@@ -103,7 +103,7 @@ public class OperatorTestHelper
   public OperatorTestHelper runToCompletion(Operator op)
   {
     TestReceiver receiver = this.receiverSupply.get();
-    op.go(receiver);
+    Operator.go(op, receiver);
     Assert.assertTrue(receiver.isCompleted());
     if (finalValidation != null) {
       finalValidation.accept(receiver);
@@ -113,7 +113,7 @@ public class OperatorTestHelper
 
   public interface JustPushMe
   {
-    boolean push(RowsAndColumns rac);
+    Operator.Signal push(RowsAndColumns rac);
   }
 
   public static class TestReceiver implements Operator.Receiver
@@ -129,7 +129,7 @@ public class OperatorTestHelper
     }
 
     @Override
-    public boolean push(RowsAndColumns rac)
+    public Operator.Signal push(RowsAndColumns rac)
     {
       numPushed.incrementAndGet();
       return pushFn.push(rac);
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
index 0cd1f1b588..f943f31201 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.timeline.SegmentId;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -78,13 +79,14 @@ public class SegmentToRowsAndColumnsOperatorTest
     SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
         new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
           Assert.assertEquals(CloseableShapeshifter.class, aClass);
+          //noinspection ReturnOfNull
           return null;
         })
     );
 
     boolean exceptionThrown = false;
     try {
-      op.go(new ExceptionalReceiver());
+      Operator.go(op, new ExceptionalReceiver());
     }
     catch (ISE e) {
       Assert.assertEquals(e.getMessage(), "Segment[class 
org.apache.druid.segment.TestSegmentForAs] cannot shapeshift");
@@ -104,7 +106,7 @@ public class SegmentToRowsAndColumnsOperatorTest
           {
             @Nullable
             @Override
-            public <T> T as(Class<T> clazz)
+            public <T> T as(@Nonnull Class<T> clazz)
             {
               Assert.assertEquals(RowsAndColumns.class, clazz);
               return null;
@@ -121,7 +123,7 @@ public class SegmentToRowsAndColumnsOperatorTest
 
     boolean exceptionThrown = false;
     try {
-      op.go(new ExceptionalReceiver());
+      Operator.go(op, new ExceptionalReceiver());
     }
     catch (ISE e) {
       Assert.assertEquals(
@@ -147,9 +149,8 @@ public class SegmentToRowsAndColumnsOperatorTest
           return new CloseableShapeshifter()
           {
             @SuppressWarnings("unchecked")
-            @Nullable
             @Override
-            public <T> T as(Class<T> clazz)
+            public <T> T as(@Nonnull Class<T> clazz)
             {
               Assert.assertEquals(RowsAndColumns.class, clazz);
               return (T) expectedRac;
@@ -170,7 +171,7 @@ public class SegmentToRowsAndColumnsOperatorTest
       new OperatorTestHelper()
           .withPushFn(rac -> {
             Assert.assertSame(expectedRac, rac);
-            return true;
+            return Operator.Signal.GO;
           })
           .runToCompletion(op);
     }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
index 02070c2436..b5bcc177aa 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
@@ -38,14 +38,15 @@ public class SequenceOperatorTest
         MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new int[]{1}))
     )));
 
+    final RowsAndColumnsHelper helper = new RowsAndColumnsHelper()
+        .expectColumn("hi", new int[]{1})
+        .allColumnsRegistered();
+
     new OperatorTestHelper()
         .withPushFn(
             rac -> {
-              new RowsAndColumnsHelper()
-                  .expectColumn("hi", new int[]{1})
-                  .allColumnsRegistered()
-                  .validate(rac);
-              return true;
+              helper.validate(rac);
+              return Operator.Signal.GO;
             }
         )
         .withFinalValidation(testReceiver -> Assert.assertEquals(2, 
testReceiver.getNumPushed()))
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/SortedInnerJoinOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/SortedInnerJoinOperatorTest.java
new file mode 100644
index 0000000000..1e728eced2
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/SortedInnerJoinOperatorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.query.operator.join.JoinConfig;
+import org.apache.druid.query.operator.join.JoinPartDefn;
+import org.apache.druid.query.operator.join.SortedInnerJoinOperator;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class SortedInnerJoinOperatorTest
+{
+
+  @Test
+  public void testSimpleTwoWayJoin()
+  {
+    SortedInnerJoinOperator joinMe = new SortedInnerJoinOperator(
+        Arrays.asList(
+            buildDefn(
+                MapOfColumnsRowsAndColumns
+                    .builder()
+                    .add("joinField", new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4})
+                    .add("projectMe", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
+                    .build()
+            )
+                .joinOn("joinField")
+                .project("joinField", "projectMe")
+                .build(),
+            buildDefn(
+                MapOfColumnsRowsAndColumns
+                    .builder()
+                    .add("joinField", new int[]{0, 1, 4})
+                    .add("projectMeToo", ColumnType.STRING, "a", "b", "e")
+                    .build()
+            )
+                .joinOn("joinField")
+                .project("projectMeToo")
+                .build()
+        ),
+        new JoinConfig(4)
+    );
+
+    new OperatorTestHelper()
+        .expectRowsAndColumns(
+            new RowsAndColumnsHelper()
+                .expectColumn("joinField", new int[]{0, 0, 0, 1, 1})
+                .expectColumn("projectMe", new int[]{3, 54, 21, 1, 5})
+                .expectColumn("projectMeToo", ColumnType.STRING, "a", "a", 
"a", "b", "b")
+                .allColumnsRegistered(),
+            new RowsAndColumnsHelper()
+                .expectColumn("joinField", new int[]{4, 4, 4})
+                .expectColumn("projectMe", new int[]{2, 3, 92})
+                .expectColumn("projectMeToo", ColumnType.STRING, "e", "e", "e")
+                .allColumnsRegistered()
+        )
+        .runToCompletion(joinMe);
+  }
+
+  @Test
+  public void testSimpleThreeWayJoin()
+  {
+    SortedInnerJoinOperator joinMe = new SortedInnerJoinOperator(
+        Arrays.asList(
+            buildDefn(
+                MapOfColumnsRowsAndColumns
+                    .builder()
+                    .add("joinField", new double[]{0, 0, 0, 1, 1, 2, 4, 4, 4})
+                    .add("projectMe", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
+                    .build()
+            )
+                .joinOn("joinField")
+                .project("joinField", "projectMe")
+                .build(),
+            buildDefn(
+                MapOfColumnsRowsAndColumns
+                    .builder()
+                    .add("joinField", new double[]{0, 1, 4})
+                    .add("projectMeToo", ColumnType.STRING, "a", "b", "e")
+                    .build()
+            )
+                .joinOn("joinField")
+                .project("projectMeToo")
+                .build(),
+            buildDefn(
+                MapOfColumnsRowsAndColumns
+                    .builder()
+                    .add("joinField", new double[]{0, 0, 1, 2, 4})
+                    .add("projectMeThree", ColumnType.STRING, "C", "CC", "B", 
"Z", "A")
+                    .build()
+            )
+                .joinOn("joinField")
+                .project("projectMeThree")
+                .build()
+        ),
+        new JoinConfig(4)
+    );
+
+    new OperatorTestHelper()
+        .expectRowsAndColumns(
+            new RowsAndColumnsHelper()
+                .expectColumn("joinField", new double[]{0, 0, 0, 0, 0, 0})
+                .expectColumn("projectMe", new int[]{3, 3, 54, 54, 21, 21})
+                .expectColumn("projectMeToo", ColumnType.STRING, "a", "a", 
"a", "a", "a", "a")
+                .expectColumn("projectMeThree", ColumnType.STRING, "C", "CC", 
"C", "CC", "C", "CC")
+                .allColumnsRegistered(),
+            new RowsAndColumnsHelper()
+                .expectColumn("joinField", new double[]{1, 1, 4, 4, 4})
+                .expectColumn("projectMe", new int[]{1, 5, 2, 3, 92})
+                .expectColumn("projectMeToo", ColumnType.STRING, "b", "b", 
"e", "e", "e")
+                .expectColumn("projectMeThree", ColumnType.STRING, "B", "B", 
"A", "A", "A")
+                .allColumnsRegistered()
+        )
+        .runToCompletion(joinMe);
+  }
+
+  private JoinPartDefn.Builder buildDefn(
+      RowsAndColumns... racs
+  )
+  {
+    return JoinPartDefn.builder(InlineScanOperator.make(racs));
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
index dc2e6e9732..81b684fdc3 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
@@ -35,12 +35,9 @@ import java.util.Set;
 /**
  * Tests the WindowOperatorQuery, it would actually be a lot better to run 
this through some tests that actually
  * validate the operation of queries, but all of the efforts to build out test 
scaffolding and framework have gone
- * into building things out for SQL query operations.  As such, all of the 
tests that validating the actual native
- * functionality actually run from the `druid-sql` module instead of this 
module.  It would be best to de-couple
- * these and have all of the native, query processing tests happen directly 
here in processing and have the SQL
- * tests only concern themselves with how they plan SQL into Native, but 
that's a bit big of a nugget to bite off
- * at this point in time, so instead we continue the building of technical 
debt by making this "test" run lines
- * of code without actually testing much meaningful behavior.
+ * into building things out for SQL query operations.  As such, all of the 
tests validating the actual native
+ * functionality actually run from the `druid-sql` module instead of this 
module.  It would be really cool to move
+ * the SQL stuff into the processing module so that it can be handled in the 
same location here.
  * <p>
  * For now, view CalciteWindowQueryTest for actual tests that validate 
behavior.
  */
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
index 73d3060b80..b3a4716432 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
@@ -61,7 +61,7 @@ public class WindowProcessorOperatorTest
         .withPushFn(
             rowsAndColumns -> {
               Assert.assertSame(rac, rowsAndColumns);
-              return true;
+              return Operator.Signal.GO;
             }
         )
         .runToCompletion(op);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
index 65421b4078..ee370aa5b6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
@@ -39,11 +39,11 @@ import org.apache.druid.query.operator.ColumnWithDirection;
 import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
 import org.apache.druid.query.operator.NaiveSortOperatorFactory;
 import org.apache.druid.query.operator.OperatorFactory;
-import org.apache.druid.query.operator.WindowOperatorFactory;
 import org.apache.druid.query.operator.window.ComposingProcessor;
 import org.apache.druid.query.operator.window.Processor;
 import org.apache.druid.query.operator.window.WindowFrame;
 import org.apache.druid.query.operator.window.WindowFramedAggregateProcessor;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
 import org.apache.druid.query.operator.window.ranking.WindowCumeDistProcessor;
 import org.apache.druid.query.operator.window.ranking.WindowDenseRankProcessor;
 import 
org.apache.druid.query.operator.window.ranking.WindowPercentileProcessor;
@@ -380,6 +380,9 @@ public class Windowing
     {
       RexNode columnArgument = Expressions.fromFieldAccess(sig, project, 
call.getArgList().get(argPosition));
       final DruidExpression expression = 
Expressions.toDruidExpression(context, sig, columnArgument);
+      if (expression == null) {
+        throw new ISE("Couldn't get an expression from columnArgument[%s]", 
columnArgument);
+      }
       return expression.getDirectColumn();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to