This is an automated email from the ASF dual-hosted git repository.
cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 706b8a0227 Adjust Operators to be Pausable (#13694)
706b8a0227 is described below
commit 706b8a02270e68090ebcdd67a170df0007454bd2
Author: imply-cheddar <[email protected]>
AuthorDate: Tue Jan 24 13:52:06 2023 +0900
Adjust Operators to be Pausable (#13694)
* Adjust Operators to be Pausable
This enables "merge" style operations that
combine multiple streams.
This change includes a naive implementation
of one such merge operator just to provide
concrete evidence that the refactoring is
effective.
---
.../druid/collections/fastutil/DruidIntList.java | 99 ++++
.../collections/fastutil/DruidIntListTest.java | 97 ++++
.../query/operator/NaivePartitioningOperator.java | 12 +-
.../druid/query/operator/NaiveSortOperator.java | 10 +-
.../org/apache/druid/query/operator/Operator.java | 104 +++-
.../druid/query/operator/OperatorFactory.java | 1 +
.../druid/query/operator/OperatorSequence.java | 87 +++-
.../operator/SegmentToRowsAndColumnsOperator.java | 6 +-
.../druid/query/operator/SequenceOperator.java | 107 +++-
.../query/operator/WindowProcessorOperator.java | 33 +-
.../druid/query/operator/join/JoinConfig.java} | 25 +-
.../druid/query/operator/join/JoinPartDefn.java | 93 ++++
.../operator/join/SortedInnerJoinOperator.java | 558 +++++++++++++++++++++
.../{ => window}/WindowOperatorFactory.java | 6 +-
.../rowsandcols/MapOfColumnsRowsAndColumns.java | 46 ++
.../rowsandcols/RearrangedRowsAndColumns.java | 38 +-
.../column/BinarySearchableAccessor.java | 54 ++
.../query/rowsandcols/column/ColumnAccessor.java | 50 +-
.../rowsandcols/column/ConstantObjectColumn.java | 158 +++---
.../rowsandcols/column/DoubleArrayColumn.java | 178 ++++---
.../query/rowsandcols/column/IntArrayColumn.java | 184 ++++---
.../druid/query/rowsandcols/column/NullColumn.java | 39 +-
.../rowsandcols/column/ObjectArrayColumn.java | 116 ++++-
.../semantic/DefaultSortedMatrixMaker.java | 204 ++++++++
.../rowsandcols/semantic/SortedMatrixMaker.java | 83 +++
.../druid/query/rowsandcols/util/FindResult.java | 69 +++
.../druid/segment/CloseableShapeshifter.java | 3 +-
.../druid/query/operator/ExceptionalReceiver.java | 2 +-
.../druid/query/operator/InlineScanOperator.java | 53 +-
.../druid/query/operator/OperatorSequenceTest.java | 127 ++++-
.../druid/query/operator/OperatorTestHelper.java | 14 +-
.../SegmentToRowsAndColumnsOperatorTest.java | 13 +-
.../druid/query/operator/SequenceOperatorTest.java | 11 +-
.../operator/SortedInnerJoinOperatorTest.java | 144 ++++++
.../query/operator/WindowOperatorQueryTest.java | 9 +-
.../operator/WindowProcessorOperatorTest.java | 2 +-
.../apache/druid/sql/calcite/rel/Windowing.java | 5 +-
37 files changed, 2478 insertions(+), 362 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/collections/fastutil/DruidIntList.java
b/core/src/main/java/org/apache/druid/collections/fastutil/DruidIntList.java
new file mode 100644
index 0000000000..d732c97235
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/collections/fastutil/DruidIntList.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections.fastutil;
+
+import it.unimi.dsi.fastutil.Arrays;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrays;
+
+public class DruidIntList extends IntArrayList
+{
+ public DruidIntList(int capacity)
+ {
+ super(capacity);
+ }
+
+ public void addArray(int[] vals)
+ {
+ grow(size + vals.length);
+ System.arraycopy(vals, 0, a, size, vals.length);
+ size += vals.length;
+ }
+
+ public void fill(int val, int count)
+ {
+ grow(size + count);
+ java.util.Arrays.fill(a, size, size + count, val);
+ size += count;
+ }
+
+ public void fillWithRepeat(int[] vals, int repeat)
+ {
+ int count = vals.length * repeat;
+ grow(size + count);
+ for (int i = 0; i < count; i += vals.length) {
+ System.arraycopy(vals, 0, a, size + i, vals.length);
+ }
+ size += count;
+ }
+
+ public void fillRuns(int[] vals, int runLength, int repeat)
+ {
+ if (runLength == 1) {
+ fillWithRepeat(vals, repeat);
+ return;
+ }
+ int count = vals.length * runLength * repeat;
+ grow(size + count);
+ for (int i = 0; i < repeat; ++i) {
+ for (int val : vals) {
+ // there's a += hidden in there, don't be tricked!
+ java.util.Arrays.fill(a, size, size += runLength, val);
+ }
+ }
+ }
+
+ public void resetToSize(int targetSize)
+ {
+ a = new int[targetSize];
+ size = 0;
+ }
+
+ /**
+ * Method gratuitously "borrowed" from IntArrayList, would've been nice if
that method wasn't private, but ah well.
+ *
+ * @param capacity the capacity to grow to
+ * @see IntArrayList#grow(int)
+ */
+ @SuppressWarnings("ArrayEquality")
+ private void grow(int capacity)
+ {
+ if (capacity <= a.length) {
+ return;
+ }
+ if (a != IntArrays.DEFAULT_EMPTY_ARRAY) {
+ capacity = (int) Math.max(Math.min((long) a.length + (a.length >> 1),
Arrays.MAX_ARRAY_SIZE), capacity);
+ } else if (capacity < DEFAULT_INITIAL_CAPACITY) {
+ capacity = DEFAULT_INITIAL_CAPACITY;
+ }
+ a = IntArrays.forceCapacity(a, capacity, size);
+ assert size <= a.length;
+ }
+}
diff --git
a/core/src/test/java/org/apache/druid/collections/fastutil/DruidIntListTest.java
b/core/src/test/java/org/apache/druid/collections/fastutil/DruidIntListTest.java
new file mode 100644
index 0000000000..3dd6862867
--- /dev/null
+++
b/core/src/test/java/org/apache/druid/collections/fastutil/DruidIntListTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections.fastutil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DruidIntListTest
+{
+
+ @Test
+ public void addArray()
+ {
+ DruidIntList intList = new DruidIntList(2);
+
+ final int[] arrrrr = {0, 1, 2, 3};
+ intList.addArray(arrrrr);
+ expectEquals(intList, arrrrr);
+ }
+
+ @Test
+ public void fill()
+ {
+ DruidIntList intList = new DruidIntList(2);
+
+ intList.fill(2, 4);
+
+ expectEquals(intList, new int[]{2, 2, 2, 2});
+ }
+
+ @Test
+ public void fillWithRepeat()
+ {
+ DruidIntList intList = new DruidIntList(2);
+
+ intList.fillWithRepeat(new int[]{0, 1}, 2);
+
+ expectEquals(intList, new int[]{0, 1, 0, 1});
+ }
+
+ @Test
+ public void fillRuns()
+ {
+ DruidIntList intList = new DruidIntList(2);
+
+ intList.fillRuns(new int[]{0, 1}, 2, 2);
+
+ expectEquals(intList, new int[]{0, 0, 1, 1, 0, 0, 1, 1});
+ }
+
+ @Test
+ public void fillRunsRunLengthOne()
+ {
+ DruidIntList intList = new DruidIntList(2);
+
+ intList.fillRuns(new int[]{0, 1}, 1, 2);
+
+ expectEquals(intList, new int[]{0, 1, 0, 1});
+ }
+
+ @Test
+ public void resetToSize()
+ {
+ DruidIntList intList = new DruidIntList(2);
+
+ intList.fill(2, 4);
+
+ Assert.assertEquals(4, intList.size());
+ intList.resetToSize(4);
+ Assert.assertEquals(0, intList.size());
+ }
+
+ private void expectEquals(DruidIntList intList, int[] expected)
+ {
+ Assert.assertEquals(expected.length, intList.size());
+ for (int i = 0; i < expected.length; ++i) {
+ Assert.assertEquals(String.valueOf(i), expected[i], intList.getInt(i));
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
index 0e9e951418..90000a9fa6 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
@@ -23,6 +23,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
+import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
@@ -52,13 +53,14 @@ public class NaivePartitioningOperator implements Operator
}
@Override
- public void go(Receiver receiver)
+ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
- child.go(
+ return child.goOrContinue(
+ continuation,
new Receiver()
{
@Override
- public boolean push(RowsAndColumns rac)
+ public Signal push(RowsAndColumns rac)
{
ClusteredGroupPartitioner groupPartitioner =
rac.as(ClusteredGroupPartitioner.class);
if (groupPartitioner == null) {
@@ -67,8 +69,8 @@ public class NaivePartitioningOperator implements Operator
partitionsIter =
groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
- boolean keepItGoing = true;
- while (keepItGoing && partitionsIter.hasNext()) {
+ Signal keepItGoing = Signal.GO;
+ while (keepItGoing == Signal.GO && partitionsIter.hasNext()) {
keepItGoing = receiver.push(partitionsIter.next());
}
return keepItGoing;
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
index 615d18df86..e11da38421 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker;
+import java.io.Closeable;
import java.util.ArrayList;
/**
@@ -44,22 +45,23 @@ public class NaiveSortOperator implements Operator
}
@Override
- public void go(Receiver receiver)
+ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
- child.go(
+ return child.goOrContinue(
+ continuation,
new Receiver()
{
NaiveSortMaker.NaiveSorter sorter = null;
@Override
- public boolean push(RowsAndColumns rac)
+ public Signal push(RowsAndColumns rac)
{
if (sorter == null) {
sorter = NaiveSortMaker.fromRAC(rac).make(sortColumns);
} else {
sorter.moreData(rac);
}
- return true;
+ return Signal.GO;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/Operator.java
b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
index ecc0765e74..a9a18c36d5 100644
--- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java
+++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
@@ -21,6 +21,9 @@ package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
/**
* An Operator interface that intends to have implementations that align
relatively closely with the Operators that
* other databases would also tend to be implemented using. While a lot of
Operator interfaces tend to use a
@@ -43,11 +46,100 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
public interface Operator
{
/**
- * Tells the Operation to start doing its work. Data will be pushed into
the Receiver.
+ * Convenience method to run an Operator until completion. Data will be
pushed into the Receiver. This is the
+ * primary entry point that users of Operators will call to do their work.
*
+ * @param op the operator to run to completion
* @param receiver a receiver that will receive data
*/
- void go(Receiver receiver);
+ static void go(Operator op, Receiver receiver)
+ {
+ Closeable continuation = null;
+ do {
+ continuation = op.goOrContinue(continuation, receiver);
+ } while (continuation != null);
+ }
+
+ /**
+ * This is the primary workhorse method of an Operator. That said, users of
Operators are not expected to use this
+ * method and instead are expected to call the static method {@link
Operator#go(Operator, Receiver)}.
+ * <p>
+ * Data will be pushed into the Receiver. The Receiver has the option of
returning any of the {@link Signal} signals
+ * to indicate its degree of readiness for more data to be received.
+ * <p>
+ * If a Receiver returns a {@link Signal#PAUSE} signal, then if there is
processing left to do, then it is expected
+ * that a non-null "continuation" object nwill be returned. This allows for
flow control to be returned to the
+ * caller to, e.g., process another Operator or just exert backpressure. In
this case, when the controller wants to
+ * resume, it must call this method again and include the continuation
object that it received.
+ * <p>
+ * The continuation object is Closeable because it is possible that while
processing is paused on one Operator, the
+ * processing of another Operator could obviate the need for further
processing. In this case, instead of resuming
+ * the paused Operation and returning {@link Signal#STOP} on the next push
into the Receiver, the code must
+ * call {@link Closeable#close()} on the continuation object to cancel all
further processing and clean up all
+ * related resources. If, instead, the continuation object is passed back
into a call to goOrContinue, then
+ * close() must <em>NOT</em> be called on the continuation object. Said
again, the controller must either
+ * 1) pass the continuation object back into a call to goOrContinue, OR
+ * 2) call close() on the continuation object
+ * and <em>NEVER</em> do both.
+ * <p>
+ * Once a reference to a continuation object has been passed back to a
goOrContinue method, it should never be
+ * reused by the controller. This is to give Operator implementations the
ability to decide whether it makes sense
+ * to reuse the objects on subsequent calls or create new ones.
+ * <p>
+ * A null return value from this method indicates that processing is
complete. The Receiver should have had its
+ * {@link Receiver#completed()} method called and any resources associated
with processing have already been cleaned
+ * up. Additionally, if an exception escapes a call to this method, any
resources associated with processing should have
+ * been cleaned up.
+ * <p>
+ * For implementators of the interface, if an Operator does not have any
resources of its own to clean up, then it is
+ * safe to just pass through the continuation object to the caller.
However, if there are resources associated with
+ * the processing that must be cleaned up, the Operator implementation must
wrap the received Closeable in a new
+ * Closeable that will close those resources. In this case, when the object
comes back to the Operator on a call to
+ * goOrContinue, the Operator must unwrap the internal Closeable and pass
that back down. In a similar fashion,
+ * if there is any state that an Operator requires to be able to resume its
processing, then it is expected that the
+ * Operator will cast the object back to an instance of the type that it had
originally returned.
+ *
+ * @param receiver a receiver that will receiver data
+ * @return null if processing is complete, non-null if the Receiver returned
a {@link Signal#PAUSE} signal
+ */
+ @Nullable
+ Closeable goOrContinue(Closeable continuationObject, Receiver receiver);
+
+ /**
+ * This is the return object from a receiver. It is used to communicate to
whatever is pushing the data into the
+ * receiver the state of processing. This exists because Operators can
sometimes decide that no more results will
+ * be needed (e.g. if the result set is being limited), in which case, they
need some way to communicate this
+ * to downstream processing to effectively "cancel" further computation.
+ * <p>
+ * It's named weird because... well, the author had a hard time coming up
with a meaningful name. Suggestions
+ * for alternate names are welcome.
+ */
+ enum Signal
+ {
+ /**
+ * Indicates that the downstream processing need not do anything else.
Operators that return this should avoid
+ * pre-emptively calling {@link Receiver#completed()} before returning
STOP. They should instead return STOP
+ * and trust that the downstream code will call {@link
Receiver#completed()}. This is because downstream code
+ * *might* be pipelining computations to prepare the next set of data and
if the Operator first calls
+ * {@link Receiver#completed()} before communicating that no further
results are needed, it delays the canceling
+ * of the pipelined operations and effectively wastes CPU cycles.
+ */
+ STOP,
+ /**
+ * Inidcates that the downstream processing should pause its pushing of
results and instead return a
+ * continuation object that encapsulates whatever state is required to
resume processing. When this signal is
+ * received, Operators that are generating data might choose to exert
backpressure or otherwise pause their
+ * processing efforts until called again with the returned continuation
object.
+ * <p>
+ * If an Operator has completed its processing already when this signal is
received, instead of returning a
+ * continuation object, it should call {@link Receiver#completed()} and
return null.
+ */
+ PAUSE,
+ /**
+ * Indicates that more data is welcome.
+ */
+ GO
+ }
interface Receiver
{
@@ -59,15 +151,15 @@ public interface Operator
* @return a boolean value indicating if more data will be accepted. If
false, push should never be called
* anymore
*/
- boolean push(RowsAndColumns rac);
+ Signal push(RowsAndColumns rac);
/**
* Used to indicate that no more data will ever come. This is only used
during the happy path and is not
- * equivalent to a {@link java.io.Closeable#close()} method. Namely,
there is no guarantee that this method
+ * equivalent to a {@link Closeable#close()} method. Namely, there is no
guarantee that this method
* will be called if execution halts due to an exception from push.
- *
+ * <p>
* It is acceptable for an implementation to eagerly close resources from
this method, but it is not acceptable
- * for this method to be the sole method of managing the lifecycle of
resources held by the Operator
+ * for this method to be the sole method of managing the lifecycle of
resources held by the Operator.
*/
void completed();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java
b/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java
index 3d7db4af48..90bd30862c 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.operator;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
/**
* A factory for Operators. This class exists to encapsulate the
user-definition of an Operator. I.e. which operator,
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
index 06fe9db8e7..9fcec5d552 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
@@ -22,20 +22,16 @@ package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.function.Supplier;
/**
- * Provides a sequence on top of Operators. The mis-match in pull (Sequence)
and push (Operator) means that, if we
- * choose to support the Yielder interface, we have to use threading.
Managing extra threads in order to do that
- * is unfortunate, so, we choose to take a bit of a cop-out approach.
- *
- * Specifically, the accumulate method doesn't actually have the same problem
and the query pipeline after the merge
- * functions is composed of Sequences that all use accumulate instead of
yielder. Thus, if we are certain that
- * we only use the OperatorSequence in places where toYielder is not called
(i.e. it's only used as the return
- * value of the merge() calls), then we can get away with only implementing
the accumulate path.
+ * Provides a sequence on top of Operators.
*/
public class OperatorSequence implements Sequence<RowsAndColumns>
{
@@ -54,8 +50,18 @@ public class OperatorSequence implements
Sequence<RowsAndColumns>
Accumulator<OutType, RowsAndColumns> accumulator
)
{
- final MyReceiver<OutType> receiver = new MyReceiver<>(initValue,
accumulator);
- opSupplier.get().go(receiver);
+ final MyReceiver<OutType> receiver = new MyReceiver<>(
+ initValue,
+ new YieldingAccumulator<OutType, RowsAndColumns>()
+ {
+ @Override
+ public OutType accumulate(OutType accumulated, RowsAndColumns in)
+ {
+ return accumulator.accumulate(accumulated, in);
+ }
+ }
+ );
+ Operator.go(opSupplier.get(), receiver);
return receiver.getRetVal();
}
@@ -65,32 +71,79 @@ public class OperatorSequence implements
Sequence<RowsAndColumns>
YieldingAccumulator<OutType, RowsAndColumns> accumulator
)
{
- // As mentioned in the class-level javadoc, we skip this implementation
and leave it up to the developer to
- // only use this class in "safe" locations.
- throw new UnsupportedOperationException("Cannot convert an Operator to a
Yielder");
+ final Operator op = opSupplier.get();
+ final MyReceiver<OutType> receiver = new MyReceiver<>(initValue,
accumulator);
+ final Closeable finalContinuation = op.goOrContinue(null, receiver);
+ if (finalContinuation == null && !accumulator.yielded()) {
+ // We finished processing, and the accumulator did not yield, so we
return a done yielder with our value
+ return Yielders.done(receiver.getRetVal(), null);
+ } else {
+ return new Yielder<OutType>()
+ {
+ private Closeable continuation = finalContinuation;
+
+ @Override
+ public OutType get()
+ {
+ return receiver.getRetVal();
+ }
+
+ @Override
+ public Yielder<OutType> next(OutType initValue)
+ {
+ if (continuation == null) {
+ // This means that we completed processing on the previous run.
In this case, we are all done
+ return Yielders.done(null, null);
+ }
+ receiver.setRetVal(initValue);
+
+ continuation = op.goOrContinue(continuation, receiver);
+ return this;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (continuation != null) {
+ continuation.close();
+ }
+ }
+ };
+ }
}
private static class MyReceiver<OutType> implements Operator.Receiver
{
- private final Accumulator<OutType, RowsAndColumns> accumulator;
+ private final YieldingAccumulator<OutType, RowsAndColumns> accumulator;
private OutType retVal;
- public MyReceiver(OutType initValue, Accumulator<OutType, RowsAndColumns>
accumulator)
+ public MyReceiver(OutType initValue, YieldingAccumulator<OutType,
RowsAndColumns> accumulator)
{
this.accumulator = accumulator;
retVal = initValue;
}
+ public void setRetVal(OutType retVal)
+ {
+ this.retVal = retVal;
+ }
+
public OutType getRetVal()
{
return retVal;
}
@Override
- public boolean push(RowsAndColumns rac)
+ public Operator.Signal push(RowsAndColumns rac)
{
retVal = accumulator.accumulate(retVal, rac);
- return true;
+ return accumulator.yielded() ? Operator.Signal.PAUSE :
Operator.Signal.GO;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
index 67a847d81d..e60589497d 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
@@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.Segment;
+import java.io.Closeable;
import java.io.IOException;
public class SegmentToRowsAndColumnsOperator implements Operator
@@ -39,7 +40,7 @@ public class SegmentToRowsAndColumnsOperator implements
Operator
}
@Override
- public void go(Receiver receiver)
+ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
try (final CloseableShapeshifter shifty =
segment.as(CloseableShapeshifter.class)) {
if (shifty == null) {
@@ -50,11 +51,14 @@ public class SegmentToRowsAndColumnsOperator implements
Operator
if (rac == null) {
throw new ISE("Cannot work with segment of type[%s]",
segment.getClass());
}
+
+ // After pushing in a single object, we are done, so ignore the signal
and call completed()
receiver.push(rac);
receiver.completed();
}
catch (IOException e) {
throw new RE(e, "Problem closing resources for segment[%s]",
segment.getId());
}
+ return null;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
index 4014a10e6c..7f42a0dbf2 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
@@ -19,11 +19,20 @@
package org.apache.druid.query.operator;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import java.io.Closeable;
+import java.io.IOException;
+
public class SequenceOperator implements Operator
{
+ private static final Logger log = new Logger(SequenceOperator.class);
+
private final Sequence<RowsAndColumns> child;
public SequenceOperator(
@@ -33,16 +42,96 @@ public class SequenceOperator implements Operator
this.child = child;
}
+ @SuppressWarnings("unchecked")
@Override
- public void go(Receiver receiver)
+ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
- child.accumulate(
- null,
- (accumulated, in) -> {
- receiver.push(in);
- return accumulated;
- }
- );
- receiver.completed();
+ Yielder<Signal> yielder = null;
+ final Signal theSignal;
+ if (continuation == null) {
+ yielder = child.toYielder(Signal.GO, new YieldingAccumulator<Signal,
RowsAndColumns>()
+ {
+ @Override
+ public Signal accumulate(Signal accumulated, RowsAndColumns in)
+ {
+ final Signal pushSignal = receiver.push(in);
+ switch (pushSignal) {
+ case PAUSE:
+ this.yield();
+ return Signal.PAUSE;
+ case GO:
+ return Signal.GO;
+ case STOP:
+ this.yield();
+ return Signal.STOP;
+ default:
+ throw new ISE("How can this be happening? signal[%s]",
pushSignal);
+ }
+ }
+ });
+ theSignal = yielder.get();
+ } else {
+ try {
+ final Yielder<Signal> castedYielder = (Yielder<Signal>) continuation;
+ if (castedYielder.isDone()) {
+ throw new ISE(
+ "The yielder is done! The previous go call should've resulted
in completion instead of continuation"
+ );
+ }
+ yielder = castedYielder.next(Signal.GO);
+ theSignal = yielder.get();
+ }
+ catch (ClassCastException e) {
+ try {
+ if (yielder == null) {
+ // Got the exception when casting the continuation, close the
continuation and move on.
+ continuation.close();
+ } else {
+ // Got the exception when reading the result from the
continuation, close the yielder and move on.
+ yielder.close();
+ }
+ }
+ catch (IOException ex) {
+ e.addSuppressed(
+ new ISE("Unable to close continuation[%s] of type[%s]",
continuation, continuation.getClass())
+ );
+ }
+ throw e;
+ }
+ }
+
+ switch (theSignal) {
+ // We get GO from the yielder if the last push created a GO and there
was nothing left in the sequence.
+ // I.e. we are done
+ case GO:
+ case STOP:
+ try {
+ receiver.completed();
+ }
+ catch (RuntimeException e) {
+ try {
+ yielder.close();
+ }
+ catch (IOException ioException) {
+ e.addSuppressed(ioException);
+ throw e;
+ }
+ }
+
+ try {
+ yielder.close();
+ }
+ catch (IOException e) {
+ // We got an exception when closing after we received a STOP signal
and successfully called completed().
+ // This means that the Receiver has already done what it needs, so
instead of throw the exception and
+ // potentially impact processing, we log instead and allow
processing to continue.
+ log.warn(e, "Exception thrown when closing yielder. Logging and
ignoring because results should be fine.");
+ }
+ return null;
+ case PAUSE:
+ return yielder;
+ }
+ throw new ISE("How can this happen!? signal[%s]", theSignal);
}
+
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
index 02bf780f55..4c4caa944c 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
@@ -22,6 +22,8 @@ package org.apache.druid.query.operator;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import java.io.Closeable;
+
/**
* An Operator that applies a {@link Processor}, see javadoc on that interface
for an explanation.
*/
@@ -40,21 +42,24 @@ public class WindowProcessorOperator implements Operator
}
@Override
- public void go(Receiver receiver)
+ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
- child.go(new Receiver()
- {
- @Override
- public boolean push(RowsAndColumns rac)
- {
- return receiver.push(windowProcessor.process(rac));
- }
+ return child.goOrContinue(
+ continuation,
+ new Receiver()
+ {
+ @Override
+ public Signal push(RowsAndColumns rac)
+ {
+ return receiver.push(windowProcessor.process(rac));
+ }
- @Override
- public void completed()
- {
- receiver.completed();
- }
- });
+ @Override
+ public void completed()
+ {
+ receiver.completed();
+ }
+ }
+ );
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
b/processing/src/main/java/org/apache/druid/query/operator/join/JoinConfig.java
similarity index 69%
copy from
processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
copy to
processing/src/main/java/org/apache/druid/query/operator/join/JoinConfig.java
index b4c52e1950..f60e450283 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/join/JoinConfig.java
@@ -17,21 +17,26 @@
* under the License.
*/
-package org.apache.druid.query.operator;
+package org.apache.druid.query.operator.join;
-import org.apache.druid.query.rowsandcols.RowsAndColumns;
-
-public class ExceptionalReceiver implements Operator.Receiver
+public class JoinConfig
{
- @Override
- public boolean push(RowsAndColumns rac)
+ private final int releaseSize;
+
+ public JoinConfig(
+ int releaseSize
+ )
+ {
+ this.releaseSize = releaseSize;
+ }
+
+ public int getReleaseSize()
{
- throw new UnsupportedOperationException();
+ return releaseSize;
}
- @Override
- public void completed()
+ public int getBufferSize()
{
- throw new UnsupportedOperationException();
+ return Math.min(releaseSize, releaseSize + (releaseSize >>> 4)); // +
1/16th
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/join/JoinPartDefn.java
b/processing/src/main/java/org/apache/druid/query/operator/join/JoinPartDefn.java
new file mode 100644
index 0000000000..9594175c71
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/operator/join/JoinPartDefn.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.join;
+
+import org.apache.druid.query.operator.Operator;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class JoinPartDefn
+{
+ public static Builder builder(Operator op)
+ {
+ return new Builder(op);
+ }
+
+
+ private final Operator op;
+ private final List<String> joinFields;
+ private final List<String> projectFields;
+
+ public JoinPartDefn(
+ Operator op,
+ List<String> joinFields,
+ List<String> projectFields
+ )
+ {
+ this.op = op;
+ this.joinFields = joinFields;
+ this.projectFields = projectFields;
+ }
+
+ public Operator getOp()
+ {
+ return op;
+ }
+
+ public List<String> getJoinFields()
+ {
+ return joinFields;
+ }
+
+ public List<String> getProjectFields()
+ {
+ return projectFields;
+ }
+
+ public static class Builder
+ {
+ private final Operator op;
+ private List<String> joinFields;
+ private List<String> projectFields;
+
+ public Builder(Operator op)
+ {
+ this.op = op;
+ }
+
+ public Builder joinOn(String... fields)
+ {
+ joinFields = Arrays.asList(fields);
+ return this;
+ }
+
+ public Builder project(String... fields)
+ {
+ projectFields = Arrays.asList(fields);
+ return this;
+ }
+
+ public JoinPartDefn build()
+ {
+ return new JoinPartDefn(op, joinFields, projectFields);
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
new file mode 100644
index 0000000000..2468d79f78
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/operator/join/SortedInnerJoinOperator.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator.join;
+
+import org.apache.druid.collections.fastutil.DruidIntList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RearrangedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.semantic.SortedMatrixMaker;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An operator that can join the data streams from other operators. Data must
be provided in the same sorted
+ * order between the different operators.
+ * <p>
+ * Performs an InnerJoin based on the equality of two sets of fields. Null is
considered a meaningful value
+ * when comparing the two streams. If null is intended to be excluded, it
should be removed through a filter.
+ * <p>
+ * This class was created more as an exercise in ensuring that something
meaningful can be made to do combinations
+ * of Operators (and ensure the interface is correct). There are tests that
show that this class works, but those
+ * tests are not (yet) considered exhaustive. This paragraph in the comments
should exist as a cautionary indication
+ * that if/when this class is dusted off for use again, there might be bugs
yet lurking and it should likely start
+ * with fleshing out the tests.
+ */
+public class SortedInnerJoinOperator implements Operator
+{
+ private static final Logger log = new Logger(SortedInnerJoinOperator.class);
+
+ private final ArrayList<JoinPart> parts;
+ private final JoinConfig config;
+
+ public SortedInnerJoinOperator(
+ List<JoinPartDefn> partDefns,
+ JoinConfig config
+ )
+ {
+ this.parts = new ArrayList<>(partDefns.size());
+ for (JoinPartDefn partDefn : partDefns) {
+ parts.add(new JoinPart(partDefn.getOp(), partDefn.getJoinFields(),
partDefn.getProjectFields()));
+ }
+
+ this.config = config;
+ }
+
+ @Override
+ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
+ {
+ JoinLogic joinLogic;
+ if (continuation == null) {
+ joinLogic = new JoinLogic(config, parts);
+ } else {
+ joinLogic = (JoinLogic) continuation;
+ }
+
+ try {
+ joinLogic.go(receiver);
+ switch (joinLogic.state) {
+ case NEEDS_DATA:
+ case READY:
+ throw new ISE("joinLogic.go() exited with state[%s], should never
happen.", joinLogic.state);
+ case COMPLETE:
+ return null;
+ case PAUSED:
+ return joinLogic;
+ default:
+ throw new ISE("Unknown state[%s]", joinLogic.state);
+ }
+ }
+ catch (RuntimeException e) {
+ try {
+ joinLogic.close();
+ }
+ catch (Exception ex) {
+ e.addSuppressed(ex);
+ }
+ throw e;
+ }
+ }
+
+ private static class JoinLogic implements Closeable
+ {
+ private final JoinConfig config;
+
+ private final ArrayList<JoinPart> joinParts;
+
+ private State state;
+ private int nextPositionToLoad;
+
+ private JoinLogic(
+ JoinConfig config,
+ ArrayList<JoinPart> joinParts
+ )
+ {
+ this.config = config;
+ this.joinParts = joinParts;
+
+ setNextPositionToLoad(joinParts.size() - 1);
+ }
+
+ public void go(Receiver receiver)
+ {
+ while (state != State.COMPLETE) {
+ final int position = nextPositionToLoad;
+ final JoinPart joinPart = joinParts.get(position);
+ //noinspection VariableNotUsedInsideIf
+ if (joinPart.curr != null) {
+ throw new ISE("loading data for position[%d], but it already had
data!? Probably a bug!", position);
+ }
+
+ joinPart.goOrContinue(new Receiver()
+ {
+ @Override
+ public Signal push(RowsAndColumns rac)
+ {
+ joinPart.setCurr(rac);
+ process(receiver);
+
+ switch (state) {
+ case COMPLETE:
+ return Signal.STOP;
+ case NEEDS_DATA:
+ return Signal.GO;
+ case PAUSED:
+ return Signal.PAUSE;
+ case READY:
+ throw new ISE("Was in state READY after process returned!?");
+ default:
+ throw new ISE("Unknown state[%s]", state);
+ }
+ }
+
+ @Override
+ public void completed()
+ {
+ joinPart.complete.set(true);
+ }
+ });
+
+ if (joinPart.continuation == null && joinPart.needsData() &&
joinPart.isComplete()) {
+ // In this case, (1) the op returned null, (2) we don't have
anything buffered to process and (3) completed
+ // was called. We are done.
+ state = State.COMPLETE;
+ }
+
+ switch (state) {
+ case READY:
+ throw new ISE("Don't expect READY state here, process() should've
changed it to something else");
+ case PAUSED:
+ return;
+ case COMPLETE:
+ receiver.completed();
+
+ try {
+ close();
+ }
+ catch (IOException e) {
+ log.warn("Problem closing a join part[%d], ignoring because we
are done anyway.", position);
+ }
+ break;
+ case NEEDS_DATA:
+ break;
+ default:
+ throw new ISE("Unknown state[%s]", state);
+ }
+ }
+ }
+
+ /**
+ * Processes whatever it can from the buffers, pushes the created
RowsAndColumns to the receiver and
+ * returns which side we need more data from.
+ * <p>
+ * updates {@link #nextPositionToLoad} to a positive number if there is
another position to load.
+ * sets it to -1 if processing is complete.
+ */
+ private void process(Receiver receiver)
+ {
+ // First check that we have something to work with for all parts of the
join
+ for (int i = joinParts.size() - 1; i >= 0; --i) {
+ final JoinPart joinPart = joinParts.get(i);
+ if (joinPart.needsData()) {
+ if (joinPart.isComplete()) {
+ state = State.COMPLETE;
+ } else {
+ setNextPositionToLoad(i);
+ }
+ return;
+ }
+ }
+ state = State.READY;
+
+ DruidIntList[] rowsToInclude = new DruidIntList[joinParts.size()];
+ for (int i = 0; i < rowsToInclude.length; ++i) {
+ rowsToInclude[i] = new DruidIntList(config.getBufferSize());
+ if (joinParts.get(i).needsData()) {
+ throw new ISE("doJoin called while joinPart[%d] needed data. This
is likely a bug", i);
+ }
+ }
+
+ final int finalIndex = joinParts.size() - 1;
+ final JoinPart finalPart = joinParts.get(finalIndex);
+ SortedMatrixMaker.SortedMatrix.MatrixRow row = null;
+ while (state == State.READY) {
+ if (row == null) {
+ row = finalPart.currMatrix.getRow(finalPart.currRowIndex);
+ }
+
+ row = joinRows(receiver, finalIndex, rowsToInclude, row);
+ }
+
+ pushRows(receiver, rowsToInclude);
+ }
+
+ private void pushRows(Receiver receiver, DruidIntList[] rowsToInclude)
+ {
+ int size = rowsToInclude[0].size();
+ if (size == 0) {
+ return;
+ }
+
+ LinkedHashMap<String, Column> cols = new LinkedHashMap<>();
+
+ for (int i = 0; i < joinParts.size(); ++i) {
+ final JoinPart part = joinParts.get(i);
+ RowsAndColumns remapped =
+ new RearrangedRowsAndColumns(rowsToInclude[i].elements(), 0,
rowsToInclude[i].size(), part.curr);
+
+ for (String field : part.projectFields) {
+ cols.put(field, remapped.findColumn(field));
+ }
+ }
+
+ final Signal signal = receiver.push(new MapOfColumnsRowsAndColumns(cols,
size));
+ switch (signal) {
+ case STOP:
+ state = State.COMPLETE;
+ break;
+ case PAUSE:
+ state = State.PAUSED;
+ break;
+ case GO:
+ break;
+ default:
+ throw new ISE("Unknown state[%s]", signal);
+ }
+ }
+
+ /**
+ * Joins rows by recursively walking the joinParts backwards.
+ * <p>
+ * Populates rowsToInclude such it represents the row mapping between the
different parts of the join. That is
+ * if there are 2 sides to the inner join and there are only 2 matches:
+ * 1) row 50 on part 0, and row 8 on part 1
+ * 2) row 8372 on part 0, and row 9 on part 1
+ * <p>
+ * Then rowsToInclude would be the equivalent of {@code new int[][]{{50,
8372}, {8, 9}}}
+ * <p>
+ * If a specific part cannot match the currently provided row, then it
will seek to the next possible row that it
+ * *could* match and return that as an alternative option. The previous
callers can then try to find that
+ * alternative row until either one of the parts is exhausted of data OR a
match is found.
+ * <p>
+ * This method returns null when there is no more work that can be done on
the current row. This could be because
+ * a match was found and results generated, or it could be because one of
the parts was exhausted and needs more.
+ *
+ * @param joinPartIndex the index of the join part that the current call
should be looking at
+ * @param rowsToInclude a mapping of rows that should be included in
results
+ * @param row the current candidate join value to match
+ * @return null if no more work can be done, non-null if there is an
alternative row that a part wants to search for
+ */
+ @Nullable
+ private SortedMatrixMaker.SortedMatrix.MatrixRow joinRows(
+ Receiver receiver,
+ int joinPartIndex,
+ DruidIntList[] rowsToInclude,
+ SortedMatrixMaker.SortedMatrix.MatrixRow row
+ )
+ {
+ final JoinPart joinPart = joinParts.get(joinPartIndex);
+
+ final FindResult findResult = joinPart.currMatrix.findRow(
+ joinPart.currRowIndex,
+ row
+ );
+
+ if (findResult.wasFound()) {
+ joinPart.currRowIndex = findResult.getStartRow();
+ joinPart.scanToRowIndex = findResult.getEndRow();
+
+ if (joinPartIndex == 0) {
+ // We have walked through all of the joinParts and have matches, so
time to add to rowsToInclude
+
+ // We have ranges in each of the parts, we will do the cartesian
product, so we will produce the product
+ // of the length of those ranges number of rows. Let's compute it
to see how many rows we will produce.
+ int numRowsExpected = joinPart.scanToRowIndex -
joinPart.currRowIndex;
+ for (int i = 1; i < joinParts.size(); ++i) {
+ final JoinPart subPart = joinParts.get(i);
+ numRowsExpected *= subPart.scanToRowIndex - subPart.currRowIndex;
+ }
+
+ if (numRowsExpected == 1) {
+ for (int i = 0; i < joinParts.size(); ++i) {
+ rowsToInclude[i].add(joinParts.get(i).currRowIndex);
+ }
+ } else {
+ if (numRowsExpected > 1_000_000) {
+ // It would be helpful to serialize the actual value out with
this error, but that risks leaking data
+ throw new IAE("Got a join, with a cartesian product that exceeds
1,000,000 rows, cannot handle it");
+ }
+
+ // The rowIds that will be used in the result of the join will be
a cartesian product, which means that
+ // the "deepest" row ids will be repeated in-order over and over,
then the next layer will have each
+ // value repeated runSize times, forming a new run of length *
runSize, and so on and so forth
+ int partIndex = joinParts.size() - 1;
+ int runSize = 1;
+ // We are guaranteed that there is a runSize greater than 1
because otherwise numRowsExpected would be 1
+ while (partIndex >= 0) {
+ final JoinPart part = joinParts.get(partIndex);
+ final int size = part.scanToRowIndex - part.currRowIndex;
+ if (size == 1) {
+ rowsToInclude[partIndex].fill(part.currRowIndex,
numRowsExpected);
+ } else {
+ int[] vals = new int[size];
+ for (int i = 0; i < vals.length; ++i) {
+ vals[i] = i + part.currRowIndex;
+ }
+ final int newRunSize = size * runSize;
+ rowsToInclude[partIndex].fillRuns(vals, runSize,
numRowsExpected / newRunSize);
+ runSize = newRunSize;
+ }
+
+ --partIndex;
+ }
+ }
+
+ if (rowsToInclude[0].size() > config.getReleaseSize()) {
+ // Incrementally push stuff out once we've collected this number
of rows
+ pushRows(receiver, rowsToInclude);
+ if (state == State.READY) {
+ // We have more to do, so reinitialize rowsToInclude
+ for (DruidIntList intList : rowsToInclude) {
+ intList.resetToSize(config.getBufferSize());
+ }
+ }
+ }
+
+ return null;
+ } else {
+ // Continue the march through the joinParts
+ final SortedMatrixMaker.SortedMatrix.MatrixRow alternativeRow =
+ joinRows(receiver, joinPartIndex - 1, rowsToInclude, row);
+
+ if (consumeCurrMaybePush(receiver, joinPartIndex, rowsToInclude,
joinPart)) {
+ return null;
+ }
+
+ if (alternativeRow == null) {
+ return null;
+ } else {
+ // The next part didn't have the row we wanted, so let's check for
the candidate that it returned
+ final FindResult alternativeFind =
+ joinPart.currMatrix.findRow(joinPart.currRowIndex,
alternativeRow);
+
+ if (alternativeFind == null) {
+ joinPart.reinitCurr();
+ setNextPositionToLoad(joinPartIndex);
+ return null;
+ } else if (alternativeFind.wasFound()) {
+ // We update our values to make finding it again easier
+ joinPart.currRowIndex = alternativeFind.getStartRow();
+ joinPart.scanToRowIndex = alternativeFind.getEndRow();
+ return alternativeRow;
+ } else {
+ if (consumeCurrMaybePush(receiver, joinPartIndex, rowsToInclude,
joinPart)) {
+ return null;
+ } else {
+ return joinPart.currMatrix.getRow(joinPart.currRowIndex);
+ }
+ }
+ }
+ }
+ } else {
+ final int next = findResult.getNext();
+ if (next >= joinPart.currMatrix.numRows()) {
+ // the next item is beyond the current RAC, so we need to load more
data.
+ joinPart.reinitCurr();
+ setNextPositionToLoad(joinPartIndex);
+ return null;
+ }
+ // No match, so return the next possible match such that the caller
can seek to that.
+ return joinPart.currMatrix.getRow(next);
+ }
+ }
+
+ private boolean consumeCurrMaybePush(
+ Receiver receiver,
+ int joinPartIndex,
+ DruidIntList[] rowsToInclude,
+ JoinPart joinPart
+ )
+ {
+ // Consume the current range of data by jumping to the end
+ joinPart.jumpTo(joinPart.scanToRowIndex);
+ if (joinPart.needsData()) {
+ // If we need more data, we must first push out anything we've built
up so far
+ // before we drop the reference to the RAC
+ pushRows(receiver, rowsToInclude);
+ for (DruidIntList intList : rowsToInclude) {
+ intList.clear();
+ }
+ // We ran out of data, so we need to just mark that we need data and
return
+ if (joinPart.isComplete()) {
+ state = State.COMPLETE;
+ joinPart.reinitCurr();
+ } else {
+ setNextPositionToLoad(joinPartIndex);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void setNextPositionToLoad(int joinPartIndex)
+ {
+ state = State.NEEDS_DATA;
+ nextPositionToLoad = joinPartIndex;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ Closer closer = Closer.create();
+ closer.registerAll(joinParts);
+ closer.close();
+ }
+ }
+
+ private enum State
+ {
+ NEEDS_DATA,
+ COMPLETE,
+ PAUSED,
+ READY
+ }
+
+ private static class JoinPart implements Closeable
+ {
+ private final Operator op;
+ private final List<String> joinFields;
+ private final List<String> projectFields;
+ private final AtomicBoolean complete;
+
+ private RowsAndColumns curr;
+ private SortedMatrixMaker.SortedMatrix currMatrix;
+ private int currRowIndex;
+ private int scanToRowIndex;
+
+ private Closeable continuation;
+
+ private JoinPart(
+ Operator op,
+ List<String> joinFields,
+ List<String> projectFields
+ )
+ {
+ this.op = op;
+ this.joinFields = joinFields;
+ this.projectFields = projectFields;
+
+ complete = new AtomicBoolean(false);
+ reinitCurr();
+ continuation = null;
+ }
+
+ public void setCurr(RowsAndColumns rac)
+ {
+ //noinspection VariableNotUsedInsideIf
+ if (curr != null) {
+ throw new ISE("Asked to setCurr even though it was not null!?");
+ }
+
+ curr = rac;
+ currMatrix = SortedMatrixMaker.fromRAC(rac).make(joinFields);
+ jumpTo(0);
+ }
+
+ public boolean needsData()
+ {
+ return curr == null || currRowIndex >= currMatrix.numRows();
+ }
+
+ public boolean isComplete()
+ {
+ return complete.get();
+ }
+
+ public void goOrContinue(Receiver receiver)
+ {
+ Closeable theContinuation = continuation;
+ continuation = null;
+ continuation = op.goOrContinue(theContinuation, receiver);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (continuation != null) {
+ continuation.close();
+ }
+ }
+
+ /**
+ * @param rowIndex the rowIndex to jump to
+ */
+ public void jumpTo(int rowIndex)
+ {
+ currRowIndex = rowIndex;
+ scanToRowIndex = -1;
+ }
+
+ public void reinitCurr()
+ {
+ curr = null;
+ currMatrix = null;
+ currRowIndex = -1;
+ scanToRowIndex = -1;
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorFactory.java
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowOperatorFactory.java
similarity index 89%
rename from
processing/src/main/java/org/apache/druid/query/operator/WindowOperatorFactory.java
rename to
processing/src/main/java/org/apache/druid/query/operator/window/WindowOperatorFactory.java
index bc4cd5206c..ea44001d50 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowOperatorFactory.java
@@ -17,12 +17,14 @@
* under the License.
*/
-package org.apache.druid.query.operator;
+package org.apache.druid.query.operator.window;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.query.operator.window.Processor;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowProcessorOperator;
public class WindowOperatorFactory implements OperatorFactory
{
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
index e320ac1999..6ce7769ac9 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
@@ -22,14 +22,24 @@ package org.apache.druid.query.rowsandcols;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
+import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
+import org.apache.druid.segment.column.ColumnType;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
public class MapOfColumnsRowsAndColumns implements RowsAndColumns
{
+ public static MapOfColumnsRowsAndColumns.Builder builder()
+ {
+ return new Builder();
+ }
+
public static MapOfColumnsRowsAndColumns of(String name, Column col)
{
return fromMap(ImmutableMap.of(name, col));
@@ -109,4 +119,40 @@ public class MapOfColumnsRowsAndColumns implements
RowsAndColumns
return null;
}
+ public static class Builder
+ {
+ public LinkedHashMap<String, Column> cols = new LinkedHashMap<>();
+
+ public Builder add(String name, int[] vals)
+ {
+ return add(name, new IntArrayColumn(vals));
+ }
+
+ public Builder add(String name, double[] vals)
+ {
+ return add(name, new DoubleArrayColumn(vals));
+ }
+
+ public Builder add(String name, ColumnType type, Object... vals)
+ {
+ return add(name, vals, type);
+ }
+
+ public Builder add(String name, Object[] vals, ColumnType type)
+ {
+ return add(name, new ObjectArrayColumn(vals, type));
+ }
+
+ public Builder add(String name, Column col)
+ {
+ cols.put(name, col);
+ return this;
+ }
+
+ public MapOfColumnsRowsAndColumns build()
+ {
+ return MapOfColumnsRowsAndColumns.fromMap(cols);
+ }
+ }
+
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
index 3476bab60c..f1793f8fd0 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
@@ -49,17 +49,30 @@ public class RearrangedRowsAndColumns implements
RowsAndColumns
private final int[] pointers;
private final RowsAndColumns rac;
+ private final int start;
+ private final int end;
public RearrangedRowsAndColumns(
int[] pointers,
RowsAndColumns rac
)
{
- if (pointers.length != rac.numRows()) {
- throw new IAE("length mismatch, pointers[%,d], rac[%,d]",
pointers.length, rac.numRows());
- }
+ this(pointers, 0, pointers.length, rac);
+ }
+ public RearrangedRowsAndColumns(
+ int[] pointers,
+ int start,
+ int end,
+ RowsAndColumns rac
+ )
+ {
+ if (end - start < 0 || end > pointers.length) {
+ throw new IAE("end[%,d] - start[%,d] was invalid!?
pointers.length[%,d]", end, start, pointers.length);
+ }
this.pointers = pointers;
+ this.start = start;
+ this.end = end;
this.rac = rac;
}
@@ -72,13 +85,14 @@ public class RearrangedRowsAndColumns implements
RowsAndColumns
@Override
public int numRows()
{
- return pointers.length;
+ return end - start;
}
@Override
@Nullable
public Column findColumn(String name)
{
+ // We do a containsKey here so that we can negative-cache nulls.
if (columnCache.containsKey(name)) {
return columnCache.get(name);
} else {
@@ -101,50 +115,50 @@ public class RearrangedRowsAndColumns implements
RowsAndColumns
@Override
public int numRows()
{
- return pointers.length;
+ return end - start;
}
@Override
public boolean isNull(int rowNum)
{
- return accessor.isNull(pointers[rowNum]);
+ return accessor.isNull(pointers[start + rowNum]);
}
@Nullable
@Override
public Object getObject(int rowNum)
{
- return accessor.getObject(pointers[rowNum]);
+ return accessor.getObject(pointers[start + rowNum]);
}
@Override
public double getDouble(int rowNum)
{
- return accessor.getDouble(pointers[rowNum]);
+ return accessor.getDouble(pointers[start + rowNum]);
}
@Override
public float getFloat(int rowNum)
{
- return accessor.getFloat(pointers[rowNum]);
+ return accessor.getFloat(pointers[start + rowNum]);
}
@Override
public long getLong(int rowNum)
{
- return accessor.getLong(pointers[rowNum]);
+ return accessor.getLong(pointers[start + rowNum]);
}
@Override
public int getInt(int rowNum)
{
- return accessor.getInt(pointers[rowNum]);
+ return accessor.getInt(pointers[start + rowNum]);
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
- return accessor.compareRows(pointers[lhsRowNum],
pointers[rhsRowNum]);
+ return accessor.compareRows(pointers[lhsRowNum], pointers[start
+ rhsRowNum]);
}
}
);
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
new file mode 100644
index 0000000000..4eddcc77f1
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/BinarySearchableAccessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.column;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+public interface BinarySearchableAccessor extends ColumnAccessor
+{
+ static BinarySearchableAccessor fromColumn(Column col)
+ {
+ final BinarySearchableAccessor retVal =
col.as(BinarySearchableAccessor.class);
+
+ if (retVal == null) {
+ final ColumnAccessor accessor = col.toAccessor();
+ if (accessor instanceof BinarySearchableAccessor) {
+ // Why didn't they just return it from the as()!?!?! Who knows, ah
well.
+ return (BinarySearchableAccessor) accessor;
+ }
+
+ throw new ISE("col[%s] is a no-go", col.getClass());
+ }
+ return retVal;
+ }
+
+ FindResult findNull(int startIndex, int endIndex);
+
+ FindResult findDouble(int startIndex, int endIndex, double val);
+
+ FindResult findFloat(int startIndex, int endIndex, float val);
+
+ FindResult findLong(int startIndex, int endIndex, long val);
+
+ FindResult findString(int startIndex, int endIndex, String val);
+
+ FindResult findComplex(int startIndex, int endIndex, Object val);
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
index 9f5959f185..4e84c398a8 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessor.java
@@ -24,7 +24,7 @@ import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
/**
- * Allows for accessing a column, provides methods to enable cell-by-cell
access.
+ * Allows for accessing a column, provides methods to enable row-by-row access
of a specific column.
*/
public interface ColumnAccessor
{
@@ -36,69 +36,69 @@ public interface ColumnAccessor
ColumnType getType();
/**
- * Get the number of cells
+ * Get the number of rows
*
- * @return the number of cells
+ * @return the number of rows
*/
int numRows();
/**
- * Get whether the value of a cell is null
+ * Get whether the value of a row is null
*
- * @param rowNum the cell id, 0-indexed
+ * @param rowNum the row id, 0-indexed
* @return true if the value is null
*/
boolean isNull(int rowNum);
/**
- * Get the {@link Object} representation of the cell.
+ * Get the {@link Object} representation of the row.
*
- * @param rowNum the cell id, 0-indexed
- * @return the {@link Object} representation of the cell. Returns {@code
null} If {@link #isNull} is true.
+ * @param rowNum the row id, 0-indexed
+ * @return the {@link Object} representation of the row. Returns {@code
null} If {@link #isNull} is true.
*/
@Nullable
Object getObject(int rowNum);
/**
- * Get the primitive {@code double} representation of the cell.
+ * Get the primitive {@code double} representation of the row.
*
- * @param rowNum the cell id, 0-indexed
- * @return the primitive {@code double} representation of the cell. Returns
{@code 0D} If {@link #isNull} is true.
+ * @param rowNum the row id, 0-indexed
+ * @return the primitive {@code double} representation of the row. Returns
{@code 0D} If {@link #isNull} is true.
*/
double getDouble(int rowNum);
/**
- * Get the primitive {@code float} representation of the cell.
+ * Get the primitive {@code float} representation of the row.
*
- * @param rowNum the cell id, 0-indexed
- * @return the primitive {@code float} representation of the cell. Returns
{@code 0F} If {@link #isNull} is true.
+ * @param rowNum the row id, 0-indexed
+ * @return the primitive {@code float} representation of the row. Returns
{@code 0F} If {@link #isNull} is true.
*/
float getFloat(int rowNum);
/**
- * Get the primitive {@code long} representation of the cell.
+ * Get the primitive {@code long} representation of the row.
*
- * @param rowNum the cell id, 0-indexed
- * @return the primitive {@code long} representation of the cell. Returns
{@code 0L} If {@link #isNull} is true.
+ * @param rowNum the row id, 0-indexed
+ * @return the primitive {@code long} representation of the row. Returns
{@code 0L} If {@link #isNull} is true.
*/
long getLong(int rowNum);
/**
- * Get the primitive {@code int} representation of the cell.
+ * Get the primitive {@code int} representation of the row.
*
- * @param rowNum the cell id, 0-indexed
- * @return the primitive {@code int} representation of the cell. Returns
{@code 0} If {@link #isNull} is true.
+ * @param rowNum the row id, 0-indexed
+ * @return the primitive {@code int} representation of the row. Returns
{@code 0} If {@link #isNull} is true.
*/
int getInt(int rowNum);
/**
- * Compares two cells using a comparison that follows the same semantics as
{@link java.util.Comparator#compare}
+ * Compares two rows using a comparison that follows the same semantics as
{@link java.util.Comparator#compare}
* <p>
- * This is not comparing the cell Ids, but the values referred to by the
cell ids.
+ * This is not comparing the row Ids, but the values referred to by the row
ids.
*
- * @param lhsRowNum the cell id of the left-hand-side of the comparison
- * @param rhsRowNum the cell id of the right-hand-side of the comparison
- * @return the result of the comparison of the two cells
+ * @param lhsRowNum the row id of the left-hand-side of the comparison
+ * @param rhsRowNum the row id of the right-hand-side of the comparison
+ * @return the result of the comparison of the two rows
*/
int compareRows(int lhsRowNum, int rhsRowNum);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
index d054472f3d..28a7c3dd10 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
@@ -43,62 +44,7 @@ public class ConstantObjectColumn implements Column
@Override
public ColumnAccessor toAccessor()
{
- return new ColumnAccessor()
- {
- @Override
- public ColumnType getType()
- {
- return type;
- }
-
- @Override
- public int numRows()
- {
- return numRows;
- }
-
- @Override
- public boolean isNull(int rowNum)
- {
- return obj == null;
- }
-
- @Override
- public Object getObject(int rowNum)
- {
- return obj;
- }
-
- @Override
- public double getDouble(int rowNum)
- {
- return ((Number) obj).doubleValue();
- }
-
- @Override
- public float getFloat(int rowNum)
- {
- return ((Number) obj).floatValue();
- }
-
- @Override
- public long getLong(int rowNum)
- {
- return ((Number) obj).longValue();
- }
-
- @Override
- public int getInt(int rowNum)
- {
- return ((Number) obj).intValue();
- }
-
- @Override
- public int compareRows(int lhsRowNum, int rhsRowNum)
- {
- return 0;
- }
- };
+ return new ConstantColumnAccessor();
}
@Nullable
@@ -121,4 +67,104 @@ public class ConstantObjectColumn implements Column
return null;
}
+
+ private class ConstantColumnAccessor implements BinarySearchableAccessor
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return type;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return numRows;
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ return obj == null;
+ }
+
+ @Override
+ public Object getObject(int rowNum)
+ {
+ return obj;
+ }
+
+ @Override
+ public double getDouble(int rowNum)
+ {
+ return ((Number) obj).doubleValue();
+ }
+
+ @Override
+ public float getFloat(int rowNum)
+ {
+ return ((Number) obj).floatValue();
+ }
+
+ @Override
+ public long getLong(int rowNum)
+ {
+ return ((Number) obj).longValue();
+ }
+
+ @Override
+ public int getInt(int rowNum)
+ {
+ return ((Number) obj).intValue();
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ return 0;
+ }
+
+ @Override
+ public FindResult findNull(int startIndex, int endIndex)
+ {
+ return findComplex(startIndex, endIndex, null);
+ }
+
+ @Override
+ public FindResult findDouble(int startIndex, int endIndex, double val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findFloat(int startIndex, int endIndex, float val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findLong(int startIndex, int endIndex, long val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findString(int startIndex, int endIndex, String val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findComplex(int startIndex, int endIndex, Object val)
+ {
+ final boolean same;
+ if (obj == null) {
+ same = val == null;
+ } else {
+ same = obj.equals(val);
+ }
+
+ return same ? FindResult.found(startIndex, endIndex) :
FindResult.notFound(endIndex);
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
index c87e69f500..9c3b799d30 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java
@@ -20,10 +20,13 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Arrays;
public class DoubleArrayColumn implements Column
{
@@ -40,62 +43,7 @@ public class DoubleArrayColumn implements Column
@Override
public ColumnAccessor toAccessor()
{
- return new ColumnAccessor()
- {
- @Override
- public ColumnType getType()
- {
- return ColumnType.DOUBLE;
- }
-
- @Override
- public int numRows()
- {
- return vals.length;
- }
-
- @Override
- public boolean isNull(int rowNum)
- {
- return false;
- }
-
- @Override
- public Object getObject(int rowNum)
- {
- return vals[rowNum];
- }
-
- @Override
- public double getDouble(int rowNum)
- {
- return vals[rowNum];
- }
-
- @Override
- public float getFloat(int rowNum)
- {
- return (float) vals[rowNum];
- }
-
- @Override
- public long getLong(int rowNum)
- {
- return (long) vals[rowNum];
- }
-
- @Override
- public int getInt(int rowNum)
- {
- return (int) vals[rowNum];
- }
-
- @Override
- public int compareRows(int lhsRowNum, int rhsRowNum)
- {
- return Double.compare(lhsRowNum, rhsRowNum);
- }
- };
+ return new MyColumnAccessor();
}
@Nullable
@@ -126,4 +74,122 @@ public class DoubleArrayColumn implements Column
}
return null;
}
+
+ private class MyColumnAccessor implements BinarySearchableAccessor
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.DOUBLE;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return vals.length;
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ return false;
+ }
+
+ @Override
+ public Object getObject(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public double getDouble(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public float getFloat(int rowNum)
+ {
+ return (float) vals[rowNum];
+ }
+
+ @Override
+ public long getLong(int rowNum)
+ {
+ return (long) vals[rowNum];
+ }
+
+ @Override
+ public int getInt(int rowNum)
+ {
+ return (int) vals[rowNum];
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ return Double.compare(lhsRowNum, rhsRowNum);
+ }
+
+ @Override
+ public FindResult findNull(int startIndex, int endIndex)
+ {
+ return FindResult.notFound(endIndex);
+ }
+
+ @Override
+ public FindResult findDouble(int startIndex, int endIndex, double val)
+ {
+ if (vals[startIndex] == val) {
+ int end = startIndex + 1;
+
+ while (end < endIndex && vals[end] == val) {
+ ++end;
+ }
+ return FindResult.found(startIndex, end);
+ }
+
+ int i = Arrays.binarySearch(vals, startIndex, endIndex, val);
+ if (i > 0) {
+ int foundStart = i;
+ int foundEnd = i + 1;
+
+ while (foundStart - 1 >= startIndex && vals[foundStart - 1] == val) {
+ --foundStart;
+ }
+
+ while (foundEnd < endIndex && vals[foundEnd] == val) {
+ ++foundEnd;
+ }
+
+ return FindResult.found(foundStart, foundEnd);
+ } else {
+ return FindResult.notFound(-(i + 1));
+ }
+ }
+
+ @Override
+ public FindResult findFloat(int startIndex, int endIndex, float val)
+ {
+ return findDouble(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findLong(int startIndex, int endIndex, long val)
+ {
+ return findDouble(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findString(int startIndex, int endIndex, String val)
+ {
+ return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
+ }
+
+ @Override
+ public FindResult findComplex(int startIndex, int endIndex, Object val)
+ {
+ return findDouble(startIndex, endIndex, Numbers.tryParseDouble(val, 0));
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
index ee5a7d7604..673cebf0e2 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java
@@ -20,10 +20,13 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Numbers;
+import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Arrays;
public class IntArrayColumn implements Column
{
@@ -40,62 +43,7 @@ public class IntArrayColumn implements Column
@Override
public ColumnAccessor toAccessor()
{
- return new ColumnAccessor()
- {
- @Override
- public ColumnType getType()
- {
- return ColumnType.LONG;
- }
-
- @Override
- public int numRows()
- {
- return vals.length;
- }
-
- @Override
- public boolean isNull(int rowNum)
- {
- return false;
- }
-
- @Override
- public Object getObject(int rowNum)
- {
- return vals[rowNum];
- }
-
- @Override
- public double getDouble(int rowNum)
- {
- return vals[rowNum];
- }
-
- @Override
- public float getFloat(int rowNum)
- {
- return vals[rowNum];
- }
-
- @Override
- public long getLong(int rowNum)
- {
- return vals[rowNum];
- }
-
- @Override
- public int getInt(int rowNum)
- {
- return vals[rowNum];
- }
-
- @Override
- public int compareRows(int lhsRowNum, int rhsRowNum)
- {
- return Integer.compare(vals[lhsRowNum], vals[rhsRowNum]);
- }
- };
+ return new MyColumnAccessor();
}
@Nullable
@@ -126,4 +74,128 @@ public class IntArrayColumn implements Column
}
return null;
}
+
+ private class MyColumnAccessor implements BinarySearchableAccessor
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.LONG;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return vals.length;
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ return false;
+ }
+
+ @Override
+ public Object getObject(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public double getDouble(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public float getFloat(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public long getLong(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public int getInt(int rowNum)
+ {
+ return vals[rowNum];
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ return Integer.compare(vals[lhsRowNum], vals[rhsRowNum]);
+ }
+
+
+ @Override
+ public FindResult findNull(int startIndex, int endIndex)
+ {
+ return FindResult.notFound(endIndex);
+ }
+
+ @Override
+ public FindResult findDouble(int startIndex, int endIndex, double val)
+ {
+ return findInt(startIndex, endIndex, (int) val);
+ }
+
+ @Override
+ public FindResult findFloat(int startIndex, int endIndex, float val)
+ {
+ return findInt(startIndex, endIndex, (int) val);
+ }
+
+ @Override
+ public FindResult findLong(int startIndex, int endIndex, long val)
+ {
+ return findInt(startIndex, endIndex, (int) val);
+ }
+
+ public FindResult findInt(int startIndex, int endIndex, int val)
+ {
+ if (vals[startIndex] == val) {
+ int end = startIndex + 1;
+
+ while (end < endIndex && vals[end] == val) {
+ ++end;
+ }
+ return FindResult.found(startIndex, end);
+ }
+
+ int i = Arrays.binarySearch(vals, startIndex, endIndex, val);
+ if (i > 0) {
+ int foundStart = i;
+ int foundEnd = i + 1;
+
+ while (foundStart - 1 >= startIndex && vals[foundStart - 1] == val) {
+ --foundStart;
+ }
+
+ while (foundEnd < endIndex && vals[foundEnd] == val) {
+ ++foundEnd;
+ }
+
+ return FindResult.found(foundStart, foundEnd);
+ } else {
+ return FindResult.notFound(-(i + 1));
+ }
+ }
+
+ @Override
+ public FindResult findString(int startIndex, int endIndex, String val)
+ {
+ return findInt(startIndex, endIndex, (int) Numbers.tryParseLong(val, 0));
+ }
+
+ @Override
+ public FindResult findComplex(int startIndex, int endIndex, Object val)
+ {
+ return findDouble(startIndex, endIndex, (int) Numbers.tryParseLong(val,
0));
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
index 73823534be..36a5046c1f 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.rowsandcols.column;
+import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
@@ -52,7 +53,7 @@ public class NullColumn implements Column
return null;
}
- public static class Accessor implements ColumnAccessor
+ public static class Accessor implements BinarySearchableAccessor
{
private final ColumnType type;
private final int size;
@@ -117,5 +118,41 @@ public class NullColumn implements Column
{
return 0;
}
+
+ @Override
+ public FindResult findNull(int startIndex, int endIndex)
+ {
+ return FindResult.found(startIndex, endIndex);
+ }
+
+ @Override
+ public FindResult findDouble(int startIndex, int endIndex, double val)
+ {
+ return FindResult.notFound(endIndex);
+ }
+
+ @Override
+ public FindResult findFloat(int startIndex, int endIndex, float val)
+ {
+ return FindResult.notFound(endIndex);
+ }
+
+ @Override
+ public FindResult findLong(int startIndex, int endIndex, long val)
+ {
+ return FindResult.notFound(endIndex);
+ }
+
+ @Override
+ public FindResult findString(int startIndex, int endIndex, String val)
+ {
+ return FindResult.notFound(endIndex);
+ }
+
+ @Override
+ public FindResult findComplex(int startIndex, int endIndex, Object val)
+ {
+ return FindResult.notFound(endIndex);
+ }
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
index 46d875286a..211c176902 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java
@@ -19,10 +19,12 @@
package org.apache.druid.query.rowsandcols.column;
+import org.apache.druid.query.rowsandcols.util.FindResult;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.Comparator;
public class ObjectArrayColumn implements Column
@@ -59,32 +61,7 @@ public class ObjectArrayColumn implements Column
@Override
public ColumnAccessor toAccessor()
{
- return new ObjectColumnAccessorBase()
- {
- @Override
- protected Object getVal(int rowNum)
- {
- return objects[rowNum];
- }
-
- @Override
- protected Comparator<Object> getComparator()
- {
- return comparator;
- }
-
- @Override
- public ColumnType getType()
- {
- return resultType;
- }
-
- @Override
- public int numRows()
- {
- return objects.length;
- }
- };
+ return new MyColumnAccessor();
}
@Nullable
@@ -105,4 +82,91 @@ public class ObjectArrayColumn implements Column
return null;
}
+ private class MyColumnAccessor extends ObjectColumnAccessorBase implements
BinarySearchableAccessor
+ {
+ @Override
+ protected Object getVal(int rowNum)
+ {
+ return objects[rowNum];
+ }
+
+ @Override
+ protected Comparator<Object> getComparator()
+ {
+ return comparator;
+ }
+
+ @Override
+ public ColumnType getType()
+ {
+ return resultType;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return objects.length;
+ }
+
+ @Override
+ public FindResult findNull(int startIndex, int endIndex)
+ {
+ return findComplex(startIndex, endIndex, null);
+ }
+
+ @Override
+ public FindResult findDouble(int startIndex, int endIndex, double val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findFloat(int startIndex, int endIndex, float val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findLong(int startIndex, int endIndex, long val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findString(int startIndex, int endIndex, String val)
+ {
+ return findComplex(startIndex, endIndex, val);
+ }
+
+ @Override
+ public FindResult findComplex(int startIndex, int endIndex, Object val)
+ {
+ if (comparator.compare(objects[startIndex], val) == 0) {
+ int end = startIndex + 1;
+
+ while (end < endIndex && comparator.compare(objects[end], val) == 0) {
+ ++end;
+ }
+ return FindResult.found(startIndex, end);
+ }
+
+ int i = Arrays.binarySearch(objects, startIndex, endIndex, val,
comparator);
+ if (i > 0) {
+ int foundStart = i;
+ int foundEnd = i + 1;
+
+ while (foundStart - 1 >= startIndex &&
comparator.compare(objects[foundStart - 1], val) == 0) {
+ --foundStart;
+ }
+
+ while (foundEnd < endIndex && comparator.compare(objects[foundEnd],
val) == 0) {
+ ++foundEnd;
+ }
+
+ return FindResult.found(foundStart, foundEnd);
+ } else {
+ return FindResult.notFound(-(i + 1));
+ }
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultSortedMatrixMaker.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultSortedMatrixMaker.java
new file mode 100644
index 0000000000..d08b6427bc
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultSortedMatrixMaker.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.BinarySearchableAccessor;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The DefaultSortedMatrixMaker is a SortedMatrixMaker that works on the
generic RowsAndColumns interface.
+ * <p>
+ * It does not validate that things are sorted, it just assumes that they must
be. As such, behavior are undefined
+ * when running on top of a RowsAndColumns that is not actually sorted by the
columns passed into {@link #make}
+ */
+public class DefaultSortedMatrixMaker implements SortedMatrixMaker
+{
+ private final RowsAndColumns rac;
+
+ public DefaultSortedMatrixMaker(
+ RowsAndColumns rac
+ )
+ {
+ this.rac = rac;
+ }
+
+ @Override
+ public SortedMatrix make(List<String> columns)
+ {
+ ArrayList<BinarySearchableAccessor> soughtColumns = new
ArrayList<>(columns.size());
+ for (String column : columns) {
+ // Note, this can add `null` to the list. That's intentional,
iterations over this list must deal
+ // with null entries meaning that the column doesn't exist
+ final Column racColumn = rac.findColumn(column);
+ if (racColumn == null) {
+ soughtColumns.add(null);
+ } else {
+ soughtColumns.add(BinarySearchableAccessor.fromColumn(racColumn));
+ }
+ }
+
+ return new SortedMatrix()
+ {
+ @Override
+ public int numRows()
+ {
+ return rac.numRows();
+ }
+
+ @Override
+ public MatrixRow getRow(int rowId)
+ {
+ return new MatrixRow()
+ {
+ @Override
+ public int length()
+ {
+ return soughtColumns.size();
+ }
+
+ @Override
+ public boolean isNull(int columnId)
+ {
+ final BinarySearchableAccessor column =
soughtColumns.get(columnId);
+ if (column == null) {
+ return true;
+ } else {
+ return column.isNull(rowId);
+ }
+ }
+
+ @Override
+ public Object getObject(int columnId)
+ {
+ final BinarySearchableAccessor column =
soughtColumns.get(columnId);
+ if (column == null) {
+ return null;
+ } else {
+ return column.getObject(rowId);
+ }
+ }
+
+ @Override
+ public double getDouble(int columnId)
+ {
+ final BinarySearchableAccessor column =
soughtColumns.get(columnId);
+ if (column == null) {
+ return 0;
+ } else {
+ return column.getDouble(rowId);
+ }
+ }
+
+ @Override
+ public float getFloat(int columnId)
+ {
+ final BinarySearchableAccessor column =
soughtColumns.get(columnId);
+ if (column == null) {
+ return 0;
+ } else {
+ return column.getFloat(rowId);
+ }
+ }
+
+ @Override
+ public long getLong(int columnId)
+ {
+ final BinarySearchableAccessor column =
soughtColumns.get(columnId);
+ if (column == null) {
+ return 0;
+ } else {
+ return column.getLong(rowId);
+ }
+ }
+ };
+ }
+
+ @Override
+ public FindResult findRow(
+ int startRowIndex,
+ MatrixRow row
+ )
+ {
+ int start = startRowIndex;
+ int end = numRows();
+
+ for (int i = 0; i < row.length(); ++i) {
+ final BinarySearchableAccessor searcher = soughtColumns.get(i);
+ if (searcher == null) {
+ if (row.isNull(i)) {
+ continue;
+ } else {
+ // We don't have the column at all, that means that we can only
match `null`. Given that the data
+ // is sorted, the next possible value is the next value of the
previous column, so return that as our
+ // end.
+ return FindResult.notFound(end);
+ }
+ }
+
+ final FindResult result;
+ if (row.isNull(i)) {
+ result = searcher.findNull(start, end);
+ } else {
+ switch (searcher.getType().getType()) {
+ case STRING:
+ result = searcher.findString(start, end, (String)
row.getObject(i));
+ break;
+ case LONG:
+ if (row.isNull(i)) {
+ result = searcher.findNull(start, end);
+ } else {
+ result = searcher.findLong(start, end, row.getLong(i));
+ }
+ break;
+ case DOUBLE:
+ result = searcher.findDouble(start, end, row.getDouble(i));
+ break;
+ case FLOAT:
+ result = searcher.findFloat(start, end, row.getFloat(i));
+ break;
+
+ case ARRAY:
+ case COMPLEX:
+ result = searcher.findComplex(start, end, row.getObject(i));
+ break;
+ default:
+ throw new RE("Unknown type[%s]", searcher.getType());
+ }
+ }
+
+ if (result.wasFound()) {
+ start = result.getStartRow();
+ end = result.getEndRow();
+ } else {
+ return FindResult.notFound(result.getNext());
+ }
+ }
+
+ return FindResult.found(start, end);
+ }
+ };
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/SortedMatrixMaker.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/SortedMatrixMaker.java
new file mode 100644
index 0000000000..2e610b12ee
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/SortedMatrixMaker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.semantic;
+
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.util.FindResult;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * A thing that makes Matrixs in an assumed sorted-fashion.
+ *
+ * This SemanticInterface was created in support of the
SortedInnerJoinOperator, which was created as an attempt
+ * to ensure that the pausable Operators actually enable us to merge
independent streams of data. As such, thought
+ * has gone into it to the extent that it was necessary to make something that
works, it's entirely possible that
+ * it's a pointless interface and should be re-evaluated when/if the join
implementation is dusted back off
+ */
+public interface SortedMatrixMaker
+{
+ static SortedMatrixMaker fromRAC(RowsAndColumns rac)
+ {
+ SortedMatrixMaker retVal = rac.as(SortedMatrixMaker.class);
+ if (retVal == null) {
+ return new DefaultSortedMatrixMaker(rac);
+ }
+ return retVal;
+ }
+
+ SortedMatrix make(List<String> columns);
+
+ /**
+ * A matrix thingie
+ */
+ interface SortedMatrix
+ {
+ int numRows();
+
+ MatrixRow getRow(int rowId);
+
+ /**
+ * @param startRowIndex the rowIndex to start the search from. The return
value should never be less than this
+ * value
+ * @param row the row to compare against
+ * @return a FindResult null if exists beyond the bounds of the current
Matrix
+ */
+ FindResult findRow(int startRowIndex, MatrixRow row);
+
+ interface MatrixRow
+ {
+ int length();
+
+ boolean isNull(int columnId);
+
+ @Nullable
+ Object getObject(int columnId);
+
+ double getDouble(int columnId);
+
+ float getFloat(int columnId);
+
+ long getLong(int columnId);
+ }
+ }
+
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/util/FindResult.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/util/FindResult.java
new file mode 100644
index 0000000000..248734a18f
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/util/FindResult.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.util;
+
+public class FindResult
+{
+ public static FindResult found(int startRow, int endRow)
+ {
+ return new FindResult(startRow, endRow, true);
+ }
+
+ public static FindResult notFound(int nextRow)
+ {
+ return new FindResult(nextRow, -1, false);
+ }
+
+ private final int startRow;
+ private final int endRow;
+ private final boolean found;
+
+ private FindResult(
+ int startRow,
+ int endRow,
+ boolean found
+ )
+ {
+ this.startRow = startRow;
+ this.endRow = endRow;
+ this.found = found;
+ }
+
+ public boolean wasFound()
+ {
+ return found;
+ }
+
+ public int getStartRow()
+ {
+ return startRow;
+ }
+
+ public int getEndRow()
+ {
+ return endRow;
+ }
+
+ public int getNext()
+ {
+ // We overload the startRow to represent the next meaningful row if the
value wasn't actually found.
+ return startRow;
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
index f482cf04f4..edd759a475 100644
---
a/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
+++
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
@@ -41,5 +42,5 @@ public interface CloseableShapeshifter extends Closeable
* through a local implementation of the interface.
*/
@Nullable
- <T> T as(Class<T> clazz);
+ <T> T as(@Nonnull Class<T> clazz);
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
index b4c52e1950..cd515deadb 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
@@ -24,7 +24,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
public class ExceptionalReceiver implements Operator.Receiver
{
@Override
- public boolean push(RowsAndColumns rac)
+ public Operator.Signal push(RowsAndColumns rac)
{
throw new UnsupportedOperationException();
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
index 1d7b68887b..ac9e9e1aa3 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
@@ -20,41 +20,70 @@
package org.apache.druid.query.operator;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import java.io.Closeable;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class InlineScanOperator implements Operator
{
- public static InlineScanOperator make(RowsAndColumns item)
+ public static InlineScanOperator make(RowsAndColumns... item)
{
- return new InlineScanOperator(Iterators.singletonIterator(item));
+ return new InlineScanOperator(Arrays.asList(item));
}
public static InlineScanOperator make(List<RowsAndColumns> items)
{
- return new InlineScanOperator(items.iterator());
+ return new InlineScanOperator(items);
}
- private Iterator<RowsAndColumns> iter;
+ private Iterable<RowsAndColumns> iterable;
public InlineScanOperator(
- Iterator<RowsAndColumns> iter
+ Iterable<RowsAndColumns> iterable
)
{
- Preconditions.checkNotNull(iter);
- this.iter = iter;
+ Preconditions.checkNotNull(iterable);
+ this.iterable = iterable;
}
@Override
- public void go(Receiver receiver)
+ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
{
- boolean keepItGoing = true;
- while (keepItGoing && iter.hasNext()) {
+ final Iterator<RowsAndColumns> iter;
+ if (continuation == null) {
+ iter = iterable.iterator();
+ } else {
+ iter = ((Continuation) continuation).iter;
+ }
+
+ Signal keepItGoing = Signal.GO;
+ while (keepItGoing == Signal.GO && iter.hasNext()) {
keepItGoing = receiver.push(iter.next());
}
- receiver.completed();
+ if (keepItGoing == Signal.PAUSE && iter.hasNext()) {
+ return new Continuation(iter);
+ } else {
+ receiver.completed();
+ return null;
+ }
+ }
+
+ private static class Continuation implements Closeable
+ {
+ private final Iterator<RowsAndColumns> iter;
+
+ public Continuation(Iterator<RowsAndColumns> iter)
+ {
+ this.iter = iter;
+ }
+
+ @Override
+ public void close()
+ {
+ // We don't actually have anything to close
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
index 875ef52bd3..ffea250f6a 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
@@ -24,14 +24,16 @@ import
org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
+@SuppressWarnings("ConstantConditions")
public class OperatorSequenceTest
{
@Test
- public void testAccumulateButNoYielder()
+ public void testAccumulateAndYielderJustOne()
{
OperatorSequence seq = new OperatorSequence(
() -> InlineScanOperator.make(MapOfColumnsRowsAndColumns.of("hi", new
IntArrayColumn(new int[]{1})))
@@ -52,33 +54,114 @@ public class OperatorSequenceTest
).intValue()
);
- boolean exceptionThrown = false;
- try {
- Yielder<Integer> yielder = seq.toYielder(0, new
YieldingAccumulator<Integer, RowsAndColumns>()
+ Yielder<Integer> yielder = seq.toYielder(0, new
YieldingAccumulator<Integer, RowsAndColumns>()
+ {
+ @Override
+ public Integer accumulate(Integer accumulated, RowsAndColumns in)
{
- @Override
- public Integer accumulate(Integer accumulated, RowsAndColumns in)
- {
- Assert.fail("This should never be called, because we expect a UOE
before this point");
+ this.yield();
+ helper.validate(in);
+ return accumulated + 1;
+ }
+ });
+
+ Assert.assertFalse(yielder.isDone());
+ Assert.assertEquals(1, yielder.get().intValue());
+
+ yielder = yielder.next(0);
+ Assert.assertTrue(yielder.isDone());
+ }
+
+ @Test
+ public void testAccumulateAndYielderMultiple()
+ {
+ OperatorSequence seq = new OperatorSequence(
+ () -> InlineScanOperator.make(
+ MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new
int[]{1})),
+ MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new
int[]{2})),
+ MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new
int[]{3, 4})),
+ MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new
int[]{5, 6, 7, 8})),
+ MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new
int[]{9, 10, 11})),
+ MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new
int[]{12, 13, 14, 15}))
+ )
+ );
+
+ Assert.assertEquals(
+ 120,
+ seq.accumulate(
+ 0,
+ (accumulated, in) -> {
+ final ColumnAccessor col = in.findColumn("hi").toAccessor();
+ for (int i = 0; i < col.numRows(); ++i) {
+ accumulated += col.getInt(i);
+ }
+ return accumulated;
+ }
+ ).intValue()
+ );
+
+ // Never yield
+ Yielder<Integer> yielder = seq.toYielder(0, new
YieldingAccumulator<Integer, RowsAndColumns>()
+ {
+ @Override
+ public Integer accumulate(Integer accumulated, RowsAndColumns in)
+ {
+ final ColumnAccessor col = in.findColumn("hi").toAccessor();
+ for (int i = 0; i < col.numRows(); ++i) {
+ accumulated += col.getInt(i);
+ }
+ return accumulated;
+ }
+ });
+
+ Assert.assertEquals(120, yielder.get().intValue());
+ Assert.assertTrue(yielder.isDone());
+
+ // Yield at the very end...
+ yielder = seq.toYielder(0, new YieldingAccumulator<Integer,
RowsAndColumns>()
+ {
+ @Override
+ public Integer accumulate(Integer accumulated, RowsAndColumns in)
+ {
+ final ColumnAccessor col = in.findColumn("hi").toAccessor();
+ for (int i = 0; i < col.numRows(); ++i) {
+ accumulated += col.getInt(i);
+ }
+ if (accumulated == 120) {
this.yield();
- helper.validate(in);
- return accumulated + 1;
}
- });
+ return accumulated;
+ }
+ });
- // The exception will have been thrown before this point, in which case
one might wonder why the code here
- // remains. It is because this code is a correct validation of what
should happen if OperatorSequence *did*
- // implement the Yielder. It's kept for posterity in case we ever
choose to implement it using threads.
- Assert.assertFalse(yielder.isDone());
- Assert.assertEquals(1, yielder.get().intValue());
+ Assert.assertEquals(120, yielder.get().intValue());
+ Assert.assertFalse(yielder.isDone());
+ yielder = yielder.next(0);
+ Assert.assertTrue(yielder.isDone());
+
+ // Aggregate each RAC and yield.
+ yielder = seq.toYielder(0, new YieldingAccumulator<Integer,
RowsAndColumns>()
+ {
+ @Override
+ public Integer accumulate(Integer accumulated, RowsAndColumns in)
+ {
+ this.yield();
+ final ColumnAccessor col = in.findColumn("hi").toAccessor();
+ for (int i = 0; i < col.numRows(); ++i) {
+ accumulated += col.getInt(i);
+ }
+ return accumulated;
+ }
+ });
+
+ int[] expectedTotals = new int[]{1, 2, 7, 26, 30, 54};
+
+ for (int expectedTotal : expectedTotals) {
+ Assert.assertEquals(expectedTotal, yielder.get().intValue());
+ Assert.assertFalse(yielder.isDone());
yielder = yielder.next(0);
- Assert.assertTrue(yielder.isDone());
- }
- catch (UnsupportedOperationException ex) {
- Assert.assertEquals("Cannot convert an Operator to a Yielder",
ex.getMessage());
- exceptionThrown = true;
}
- Assert.assertTrue(exceptionThrown);
+ Assert.assertTrue(yielder.isDone());
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
index b1bace9aa3..e44bdc5806 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
@@ -42,10 +42,10 @@ public class OperatorTestHelper
int index = 0;
@Override
- public boolean push(RowsAndColumns rac)
+ public Operator.Signal push(RowsAndColumns rac)
{
helpers[index++].validate(rac);
- return true;
+ return Operator.Signal.GO;
}
}
).withFinalValidation(
@@ -61,10 +61,10 @@ public class OperatorTestHelper
int index = 0;
@Override
- public boolean push(RowsAndColumns rac)
+ public Operator.Signal push(RowsAndColumns rac)
{
helpers[index++].validate(rac);
- return index < helpers.length;
+ return index < helpers.length ? Operator.Signal.GO :
Operator.Signal.STOP;
}
}
).withFinalValidation(
@@ -103,7 +103,7 @@ public class OperatorTestHelper
public OperatorTestHelper runToCompletion(Operator op)
{
TestReceiver receiver = this.receiverSupply.get();
- op.go(receiver);
+ Operator.go(op, receiver);
Assert.assertTrue(receiver.isCompleted());
if (finalValidation != null) {
finalValidation.accept(receiver);
@@ -113,7 +113,7 @@ public class OperatorTestHelper
public interface JustPushMe
{
- boolean push(RowsAndColumns rac);
+ Operator.Signal push(RowsAndColumns rac);
}
public static class TestReceiver implements Operator.Receiver
@@ -129,7 +129,7 @@ public class OperatorTestHelper
}
@Override
- public boolean push(RowsAndColumns rac)
+ public Operator.Signal push(RowsAndColumns rac)
{
numPushed.incrementAndGet();
return pushFn.push(rac);
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
index 0cd1f1b588..f943f31201 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Test;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -78,13 +79,14 @@ public class SegmentToRowsAndColumnsOperatorTest
SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
Assert.assertEquals(CloseableShapeshifter.class, aClass);
+ //noinspection ReturnOfNull
return null;
})
);
boolean exceptionThrown = false;
try {
- op.go(new ExceptionalReceiver());
+ Operator.go(op, new ExceptionalReceiver());
}
catch (ISE e) {
Assert.assertEquals(e.getMessage(), "Segment[class
org.apache.druid.segment.TestSegmentForAs] cannot shapeshift");
@@ -104,7 +106,7 @@ public class SegmentToRowsAndColumnsOperatorTest
{
@Nullable
@Override
- public <T> T as(Class<T> clazz)
+ public <T> T as(@Nonnull Class<T> clazz)
{
Assert.assertEquals(RowsAndColumns.class, clazz);
return null;
@@ -121,7 +123,7 @@ public class SegmentToRowsAndColumnsOperatorTest
boolean exceptionThrown = false;
try {
- op.go(new ExceptionalReceiver());
+ Operator.go(op, new ExceptionalReceiver());
}
catch (ISE e) {
Assert.assertEquals(
@@ -147,9 +149,8 @@ public class SegmentToRowsAndColumnsOperatorTest
return new CloseableShapeshifter()
{
@SuppressWarnings("unchecked")
- @Nullable
@Override
- public <T> T as(Class<T> clazz)
+ public <T> T as(@Nonnull Class<T> clazz)
{
Assert.assertEquals(RowsAndColumns.class, clazz);
return (T) expectedRac;
@@ -170,7 +171,7 @@ public class SegmentToRowsAndColumnsOperatorTest
new OperatorTestHelper()
.withPushFn(rac -> {
Assert.assertSame(expectedRac, rac);
- return true;
+ return Operator.Signal.GO;
})
.runToCompletion(op);
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
index 02070c2436..b5bcc177aa 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
@@ -38,14 +38,15 @@ public class SequenceOperatorTest
MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new int[]{1}))
)));
+ final RowsAndColumnsHelper helper = new RowsAndColumnsHelper()
+ .expectColumn("hi", new int[]{1})
+ .allColumnsRegistered();
+
new OperatorTestHelper()
.withPushFn(
rac -> {
- new RowsAndColumnsHelper()
- .expectColumn("hi", new int[]{1})
- .allColumnsRegistered()
- .validate(rac);
- return true;
+ helper.validate(rac);
+ return Operator.Signal.GO;
}
)
.withFinalValidation(testReceiver -> Assert.assertEquals(2,
testReceiver.getNumPushed()))
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/SortedInnerJoinOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/SortedInnerJoinOperatorTest.java
new file mode 100644
index 0000000000..1e728eced2
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/operator/SortedInnerJoinOperatorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.query.operator.join.JoinConfig;
+import org.apache.druid.query.operator.join.JoinPartDefn;
+import org.apache.druid.query.operator.join.SortedInnerJoinOperator;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class SortedInnerJoinOperatorTest
+{
+
+ @Test
+ public void testSimpleTwoWayJoin()
+ {
+ SortedInnerJoinOperator joinMe = new SortedInnerJoinOperator(
+ Arrays.asList(
+ buildDefn(
+ MapOfColumnsRowsAndColumns
+ .builder()
+ .add("joinField", new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4})
+ .add("projectMe", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
+ .build()
+ )
+ .joinOn("joinField")
+ .project("joinField", "projectMe")
+ .build(),
+ buildDefn(
+ MapOfColumnsRowsAndColumns
+ .builder()
+ .add("joinField", new int[]{0, 1, 4})
+ .add("projectMeToo", ColumnType.STRING, "a", "b", "e")
+ .build()
+ )
+ .joinOn("joinField")
+ .project("projectMeToo")
+ .build()
+ ),
+ new JoinConfig(4)
+ );
+
+ new OperatorTestHelper()
+ .expectRowsAndColumns(
+ new RowsAndColumnsHelper()
+ .expectColumn("joinField", new int[]{0, 0, 0, 1, 1})
+ .expectColumn("projectMe", new int[]{3, 54, 21, 1, 5})
+ .expectColumn("projectMeToo", ColumnType.STRING, "a", "a",
"a", "b", "b")
+ .allColumnsRegistered(),
+ new RowsAndColumnsHelper()
+ .expectColumn("joinField", new int[]{4, 4, 4})
+ .expectColumn("projectMe", new int[]{2, 3, 92})
+ .expectColumn("projectMeToo", ColumnType.STRING, "e", "e", "e")
+ .allColumnsRegistered()
+ )
+ .runToCompletion(joinMe);
+ }
+
+ @Test
+ public void testSimpleThreeWayJoin()
+ {
+ SortedInnerJoinOperator joinMe = new SortedInnerJoinOperator(
+ Arrays.asList(
+ buildDefn(
+ MapOfColumnsRowsAndColumns
+ .builder()
+ .add("joinField", new double[]{0, 0, 0, 1, 1, 2, 4, 4, 4})
+ .add("projectMe", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
+ .build()
+ )
+ .joinOn("joinField")
+ .project("joinField", "projectMe")
+ .build(),
+ buildDefn(
+ MapOfColumnsRowsAndColumns
+ .builder()
+ .add("joinField", new double[]{0, 1, 4})
+ .add("projectMeToo", ColumnType.STRING, "a", "b", "e")
+ .build()
+ )
+ .joinOn("joinField")
+ .project("projectMeToo")
+ .build(),
+ buildDefn(
+ MapOfColumnsRowsAndColumns
+ .builder()
+ .add("joinField", new double[]{0, 0, 1, 2, 4})
+ .add("projectMeThree", ColumnType.STRING, "C", "CC", "B",
"Z", "A")
+ .build()
+ )
+ .joinOn("joinField")
+ .project("projectMeThree")
+ .build()
+ ),
+ new JoinConfig(4)
+ );
+
+ new OperatorTestHelper()
+ .expectRowsAndColumns(
+ new RowsAndColumnsHelper()
+ .expectColumn("joinField", new double[]{0, 0, 0, 0, 0, 0})
+ .expectColumn("projectMe", new int[]{3, 3, 54, 54, 21, 21})
+ .expectColumn("projectMeToo", ColumnType.STRING, "a", "a",
"a", "a", "a", "a")
+ .expectColumn("projectMeThree", ColumnType.STRING, "C", "CC",
"C", "CC", "C", "CC")
+ .allColumnsRegistered(),
+ new RowsAndColumnsHelper()
+ .expectColumn("joinField", new double[]{1, 1, 4, 4, 4})
+ .expectColumn("projectMe", new int[]{1, 5, 2, 3, 92})
+ .expectColumn("projectMeToo", ColumnType.STRING, "b", "b",
"e", "e", "e")
+ .expectColumn("projectMeThree", ColumnType.STRING, "B", "B",
"A", "A", "A")
+ .allColumnsRegistered()
+ )
+ .runToCompletion(joinMe);
+ }
+
+ private JoinPartDefn.Builder buildDefn(
+ RowsAndColumns... racs
+ )
+ {
+ return JoinPartDefn.builder(InlineScanOperator.make(racs));
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
index dc2e6e9732..81b684fdc3 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/WindowOperatorQueryTest.java
@@ -35,12 +35,9 @@ import java.util.Set;
/**
* Tests the WindowOperatorQuery, it would actually be a lot better to run
this through some tests that actually
* validate the operation of queries, but all of the efforts to build out test
scaffolding and framework have gone
- * into building things out for SQL query operations. As such, all of the
tests that validating the actual native
- * functionality actually run from the `druid-sql` module instead of this
module. It would be best to de-couple
- * these and have all of the native, query processing tests happen directly
here in processing and have the SQL
- * tests only concern themselves with how they plan SQL into Native, but
that's a bit big of a nugget to bite off
- * at this point in time, so instead we continue the building of technical
debt by making this "test" run lines
- * of code without actually testing much meaningful behavior.
+ * into building things out for SQL query operations. As such, all of the
tests validating the actual native
+ * functionality actually run from the `druid-sql` module instead of this
module. It would be really cool to move
+ * the SQL stuff into the processing module so that it can be handled in the
same location here.
* <p>
* For now, view CalciteWindowQueryTest for actual tests that validate
behavior.
*/
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
index 73d3060b80..b3a4716432 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
@@ -61,7 +61,7 @@ public class WindowProcessorOperatorTest
.withPushFn(
rowsAndColumns -> {
Assert.assertSame(rac, rowsAndColumns);
- return true;
+ return Operator.Signal.GO;
}
)
.runToCompletion(op);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
index 65421b4078..ee370aa5b6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
@@ -39,11 +39,11 @@ import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.NaiveSortOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
-import org.apache.druid.query.operator.WindowOperatorFactory;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.operator.window.WindowFramedAggregateProcessor;
+import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.query.operator.window.ranking.WindowCumeDistProcessor;
import org.apache.druid.query.operator.window.ranking.WindowDenseRankProcessor;
import
org.apache.druid.query.operator.window.ranking.WindowPercentileProcessor;
@@ -380,6 +380,9 @@ public class Windowing
{
RexNode columnArgument = Expressions.fromFieldAccess(sig, project,
call.getArgList().get(argPosition));
final DruidExpression expression =
Expressions.toDruidExpression(context, sig, columnArgument);
+ if (expression == null) {
+ throw new ISE("Couldn't get an expression from columnArgument[%s]",
columnArgument);
+ }
return expression.getDirectColumn();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]