http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/RangeIntersectionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/utils/RangeIntersectionIterator.java 
b/src/java/org/apache/cassandra/index/sasi/utils/RangeIntersectionIterator.java
new file mode 100644
index 0000000..0d2214a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/index/sasi/utils/RangeIntersectionIterator.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import com.google.common.collect.Iterators;
+import org.apache.cassandra.io.util.FileUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class RangeIntersectionIterator
+{
+    protected enum Strategy
+    {
+        BOUNCE, LOOKUP, ADAPTIVE
+    }
+
+    public static <K extends Comparable<K>, D extends CombinedValue<K>> 
Builder<K, D> builder()
+    {
+        return builder(Strategy.ADAPTIVE);
+    }
+
+    @VisibleForTesting
+    protected static <K extends Comparable<K>, D extends CombinedValue<K>> 
Builder<K, D> builder(Strategy strategy)
+    {
+        return new Builder<>(strategy);
+    }
+
+    public static class Builder<K extends Comparable<K>, D extends 
CombinedValue<K>> extends RangeIterator.Builder<K, D>
+    {
+        private final Strategy strategy;
+
+        public Builder(Strategy strategy)
+        {
+            super(IteratorType.INTERSECTION);
+            this.strategy = strategy;
+        }
+
+        protected RangeIterator<K, D> buildIterator()
+        {
+            // if the range is disjoint we can simply return empty
+            // iterator of any type, because it's not going to produce any 
results.
+            if (statistics.isDisjoint())
+                return new BounceIntersectionIterator<>(statistics, new 
PriorityQueue<RangeIterator<K, D>>(1));
+
+            switch (strategy)
+            {
+                case LOOKUP:
+                    return new LookupIntersectionIterator<>(statistics, 
ranges);
+
+                case BOUNCE:
+                    return new BounceIntersectionIterator<>(statistics, 
ranges);
+
+                case ADAPTIVE:
+                    return statistics.sizeRatio() <= 0.01d
+                            ? new LookupIntersectionIterator<>(statistics, 
ranges)
+                            : new BounceIntersectionIterator<>(statistics, 
ranges);
+
+                default:
+                    throw new IllegalStateException("Unknown strategy: " + 
strategy);
+            }
+        }
+    }
+
+    private static abstract class AbstractIntersectionIterator<K extends 
Comparable<K>, D extends CombinedValue<K>> extends RangeIterator<K, D>
+    {
+        protected final PriorityQueue<RangeIterator<K, D>> ranges;
+
+        private AbstractIntersectionIterator(Builder.Statistics<K, D> 
statistics, PriorityQueue<RangeIterator<K, D>> ranges)
+        {
+            super(statistics);
+            this.ranges = ranges;
+        }
+
+        public void close() throws IOException
+        {
+            for (RangeIterator<K, D> range : ranges)
+                FileUtils.closeQuietly(range);
+        }
+    }
+
+    /**
+     * Iterator which performs intersection of multiple ranges by using 
bouncing (merge-join) technique to identify
+     * common elements in the given ranges. Aforementioned "bounce" works as 
follows: range queue is poll'ed for the
+     * range with the smallest current token (main loop), that token is used 
to {@link RangeIterator#skipTo(Comparable)}
+     * other ranges, if token produced by {@link 
RangeIterator#skipTo(Comparable)} is equal to current "candidate" token,
+     * both get merged together and the same operation is repeated for next 
range from the queue, if returned token
+     * is not equal than candidate, candidate's range gets put back into the 
queue and the main loop gets repeated until
+     * next intersection token is found or at least one iterator runs out of 
tokens.
+     *
+     * This technique is every efficient to jump over gaps in the ranges.
+     *
+     * @param <K> The type used to sort ranges.
+     * @param <D> The container type which is going to be returned by {@link 
Iterator#next()}.
+     */
+    @VisibleForTesting
+    protected static class BounceIntersectionIterator<K extends Comparable<K>, 
D extends CombinedValue<K>> extends AbstractIntersectionIterator<K, D>
+    {
+        private BounceIntersectionIterator(Builder.Statistics<K, D> 
statistics, PriorityQueue<RangeIterator<K, D>> ranges)
+        {
+            super(statistics, ranges);
+        }
+
+        protected D computeNext()
+        {
+            while (!ranges.isEmpty())
+            {
+                RangeIterator<K, D> head = ranges.poll();
+
+                // jump right to the beginning of the intersection or return 
next element
+                if (head.getCurrent().compareTo(getMinimum()) < 0)
+                    head.skipTo(getMinimum());
+
+                D candidate = head.hasNext() ? head.next() : null;
+                if (candidate == null || 
candidate.get().compareTo(getMaximum()) > 0)
+                {
+                    ranges.add(head);
+                    return endOfData();
+                }
+
+                List<RangeIterator<K, D>> processed = new ArrayList<>();
+
+                boolean intersectsAll = true, exhausted = false;
+                while (!ranges.isEmpty())
+                {
+                    RangeIterator<K, D> range = ranges.poll();
+
+                    // found a range which doesn't overlap with one (or 
possibly more) other range(s)
+                    if (!isOverlapping(head, range))
+                    {
+                        exhausted = true;
+                        intersectsAll = false;
+                        break;
+                    }
+
+                    D point = range.skipTo(candidate.get());
+
+                    if (point == null) // other range is exhausted
+                    {
+                        exhausted = true;
+                        intersectsAll = false;
+                        break;
+                    }
+
+                    processed.add(range);
+
+                    if (candidate.get().equals(point.get()))
+                    {
+                        candidate.merge(point);
+                        // advance skipped range to the next element if any
+                        Iterators.getNext(range, null);
+                    }
+                    else
+                    {
+                        intersectsAll = false;
+                        break;
+                    }
+                }
+
+                ranges.add(head);
+
+                for (RangeIterator<K, D> range : processed)
+                    ranges.add(range);
+
+                if (exhausted)
+                    return endOfData();
+
+                if (intersectsAll)
+                    return candidate;
+            }
+
+            return endOfData();
+        }
+
+        protected void performSkipTo(K nextToken)
+        {
+            List<RangeIterator<K, D>> skipped = new ArrayList<>();
+
+            while (!ranges.isEmpty())
+            {
+                RangeIterator<K, D> range = ranges.poll();
+                range.skipTo(nextToken);
+                skipped.add(range);
+            }
+
+            for (RangeIterator<K, D> range : skipped)
+                ranges.add(range);
+        }
+    }
+
+    /**
+     * Iterator which performs a linear scan over a primary range (the 
smallest of the ranges)
+     * and O(log(n)) lookup into secondary ranges using values from the 
primary iterator.
+     * This technique is efficient when one of the intersection ranges is 
smaller than others
+     * e.g. ratio 0.01d (default), in such situation scan + lookup is more 
efficient comparing
+     * to "bounce" merge because "bounce" distance is never going to be big.
+     *
+     * @param <K> The type used to sort ranges.
+     * @param <D> The container type which is going to be returned by {@link 
Iterator#next()}.
+     */
+    @VisibleForTesting
+    protected static class LookupIntersectionIterator<K extends Comparable<K>, 
D extends CombinedValue<K>> extends AbstractIntersectionIterator<K, D>
+    {
+        private final RangeIterator<K, D> smallestIterator;
+
+        private LookupIntersectionIterator(Builder.Statistics<K, D> 
statistics, PriorityQueue<RangeIterator<K, D>> ranges)
+        {
+            super(statistics, ranges);
+
+            smallestIterator = statistics.minRange;
+
+            if (smallestIterator.getCurrent().compareTo(getMinimum()) < 0)
+                smallestIterator.skipTo(getMinimum());
+        }
+
+        protected D computeNext()
+        {
+            while (smallestIterator.hasNext())
+            {
+                D candidate = smallestIterator.next();
+                K token = candidate.get();
+
+                boolean intersectsAll = true;
+                for (RangeIterator<K, D> range : ranges)
+                {
+                    // avoid checking against self, much cheaper than changing 
queue comparator
+                    // to compare based on the size and re-populating such 
queue.
+                    if (range.equals(smallestIterator))
+                        continue;
+
+                    // found a range which doesn't overlap with one (or 
possibly more) other range(s)
+                    if (!isOverlapping(smallestIterator, range))
+                        return endOfData();
+
+                    D point = range.skipTo(token);
+
+                    if (point == null) // one of the iterators is exhausted
+                        return endOfData();
+
+                    if (!point.get().equals(token))
+                    {
+                        intersectsAll = false;
+                        break;
+                    }
+
+                    candidate.merge(point);
+                }
+
+                if (intersectsAll)
+                    return candidate;
+            }
+
+            return endOfData();
+        }
+
+        protected void performSkipTo(K nextToken)
+        {
+            smallestIterator.skipTo(nextToken);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/RangeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/RangeIterator.java 
b/src/java/org/apache/cassandra/index/sasi/utils/RangeIterator.java
new file mode 100644
index 0000000..1b5aee4
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/RangeIterator.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.Closeable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class RangeIterator<K extends Comparable<K>, T extends 
CombinedValue<K>> extends AbstractIterator<T> implements Closeable
+{
+    private final K min, max;
+    private final long count;
+    private K current;
+
+    protected RangeIterator(Builder.Statistics<K, T> statistics)
+    {
+        this(statistics.min, statistics.max, statistics.tokenCount);
+    }
+
+    public RangeIterator(RangeIterator<K, T> range)
+    {
+        this(range == null ? null : range.min, range == null ? null : 
range.max, range == null ? -1 : range.count);
+    }
+
+    public RangeIterator(K min, K max, long count)
+    {
+        this.min = min;
+        this.current = min;
+        this.max = max;
+        this.count = count;
+    }
+
+    public final K getMinimum()
+    {
+        return min;
+    }
+
+    public final K getCurrent()
+    {
+        return current;
+    }
+
+    public final K getMaximum()
+    {
+        return max;
+    }
+
+    public final long getCount()
+    {
+        return count;
+    }
+
+    /**
+     * When called, this iterators current position should
+     * be skipped forwards until finding either:
+     *   1) an element equal to or bigger than next
+     *   2) the end of the iterator
+     *
+     * @param nextToken value to skip the iterator forward until matching
+     *
+     * @return The next current token after the skip was performed
+     */
+    public final T skipTo(K nextToken)
+    {
+        if (min == null || max == null)
+            return endOfData();
+
+        if (current.compareTo(nextToken) >= 0)
+            return next == null ? recomputeNext() : next;
+
+        if (max.compareTo(nextToken) < 0)
+            return endOfData();
+
+        performSkipTo(nextToken);
+        return recomputeNext();
+    }
+
+    protected abstract void performSkipTo(K nextToken);
+
+    protected T recomputeNext()
+    {
+        return tryToComputeNext() ? peek() : endOfData();
+    }
+
+    protected boolean tryToComputeNext()
+    {
+        boolean hasNext = super.tryToComputeNext();
+        current = hasNext ? next.get() : getMaximum();
+        return hasNext;
+    }
+
+    public static abstract class Builder<K extends Comparable<K>, D extends 
CombinedValue<K>>
+    {
+        public enum IteratorType
+        {
+            UNION, INTERSECTION
+        }
+
+        @VisibleForTesting
+        protected final Statistics<K, D> statistics;
+
+        @VisibleForTesting
+        protected final PriorityQueue<RangeIterator<K, D>> ranges;
+
+        public Builder(IteratorType type)
+        {
+            statistics = new Statistics<>(type);
+            ranges = new PriorityQueue<>(16, (Comparator<RangeIterator<K, D>>) 
(a, b) -> a.getCurrent().compareTo(b.getCurrent()));
+        }
+
+        public K getMinimum()
+        {
+            return statistics.min;
+        }
+
+        public K getMaximum()
+        {
+            return statistics.max;
+        }
+
+        public long getTokenCount()
+        {
+            return statistics.tokenCount;
+        }
+
+        public int rangeCount()
+        {
+            return ranges.size();
+        }
+
+        public Builder<K, D> add(RangeIterator<K, D> range)
+        {
+            if (range == null || range.getMinimum() == null || 
range.getMaximum() == null)
+                return this;
+
+            ranges.add(range);
+            statistics.update(range);
+
+            return this;
+        }
+
+        public Builder<K, D> add(List<RangeIterator<K, D>> ranges)
+        {
+            if (ranges == null || ranges.isEmpty())
+                return this;
+
+            ranges.forEach(this::add);
+            return this;
+        }
+
+        public final RangeIterator<K, D> build()
+        {
+            switch (rangeCount())
+            {
+                case 0:
+                    return null;
+
+                case 1:
+                    return ranges.poll();
+
+                default:
+                    return buildIterator();
+            }
+        }
+
+        protected abstract RangeIterator<K, D> buildIterator();
+
+        public static class Statistics<K extends Comparable<K>, D extends 
CombinedValue<K>>
+        {
+            protected final IteratorType iteratorType;
+
+            protected K min, max;
+            protected long tokenCount;
+
+            // iterator with the least number of items
+            protected RangeIterator<K, D> minRange;
+            // iterator with the most number of items
+            protected RangeIterator<K, D> maxRange;
+
+            // tracks if all of the added ranges overlap, which is useful in 
case of intersection,
+            // as it gives direct answer as to such iterator is going to 
produce any results.
+            protected boolean isOverlapping = true;
+
+            public Statistics(IteratorType iteratorType)
+            {
+                this.iteratorType = iteratorType;
+            }
+
+            /**
+             * Update statistics information with the given range.
+             *
+             * Updates min/max of the combined range, token count and
+             * tracks range with the least/most number of tokens.
+             *
+             * @param range The range to update statistics with.
+             */
+            public void update(RangeIterator<K, D> range)
+            {
+                switch (iteratorType)
+                {
+                    case UNION:
+                        min = min == null || min.compareTo(range.getMinimum()) 
> 0 ? range.getMinimum() : min;
+                        max = max == null || max.compareTo(range.getMaximum()) 
< 0 ? range.getMaximum() : max;
+                        break;
+
+                    case INTERSECTION:
+                        // minimum of the intersection is the biggest minimum 
of individual iterators
+                        min = min == null || min.compareTo(range.getMinimum()) 
< 0 ? range.getMinimum() : min;
+                        // maximum of the intersection is the smallest maximum 
of individual iterators
+                        max = max == null || max.compareTo(range.getMaximum()) 
> 0 ? range.getMaximum() : max;
+                        break;
+
+                    default:
+                        throw new IllegalStateException("Unknown iterator 
type: " + iteratorType);
+                }
+
+                // check if new range is disjoint with already added ranges, 
which means that this intersection
+                // is not going to produce any results, so we can cleanup 
range storage and never added anything to it.
+                isOverlapping &= isOverlapping(min, max, range);
+
+                minRange = minRange == null ? range : min(minRange, range);
+                maxRange = maxRange == null ? range : max(maxRange, range);
+
+                tokenCount += range.getCount();
+
+            }
+
+            private RangeIterator<K, D> min(RangeIterator<K, D> a, 
RangeIterator<K, D> b)
+            {
+                return a.getCount() > b.getCount() ? b : a;
+            }
+
+            private RangeIterator<K, D> max(RangeIterator<K, D> a, 
RangeIterator<K, D> b)
+            {
+                return a.getCount() > b.getCount() ? a : b;
+            }
+
+            public boolean isDisjoint()
+            {
+                return !isOverlapping;
+            }
+
+            public double sizeRatio()
+            {
+                return minRange.getCount() * 1d / maxRange.getCount();
+            }
+        }
+    }
+
+    @VisibleForTesting
+    protected static <K extends Comparable<K>, D extends CombinedValue<K>> 
boolean isOverlapping(RangeIterator<K, D> a, RangeIterator<K, D> b)
+    {
+        return isOverlapping(a.getCurrent(), a.getMaximum(), b);
+    }
+
+    @VisibleForTesting
+    protected static <K extends Comparable<K>, D extends CombinedValue<K>> 
boolean isOverlapping(K min, K max, RangeIterator<K, D> b)
+    {
+        return min.compareTo(b.getMaximum()) <= 0 && 
b.getCurrent().compareTo(max) <= 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/RangeUnionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/utils/RangeUnionIterator.java 
b/src/java/org/apache/cassandra/index/sasi/utils/RangeUnionIterator.java
new file mode 100644
index 0000000..310b6d0
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/RangeUnionIterator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Range Union Iterator is used to return sorted stream of elements from 
multiple RangeIterator instances.
+ *
+ * PriorityQueue is used as a sorting mechanism for the ranges, where each 
computeNext() operation would poll
+ * from the queue (and push when done), which returns range that contains the 
smallest element, because
+ * sorting is done on the moving window of range iteration {@link 
RangeIterator#getCurrent()}. Once retrieved
+ * the smallest element (return candidate) is attempted to be merged with 
other ranges, because there could
+ * be equal elements in adjacent ranges, such ranges are poll'ed only if their 
{@link RangeIterator#getCurrent()}
+ * equals to the return candidate.
+ *
+ * @param <K> The type used to sort ranges.
+ * @param <D> The container type which is going to be returned by {@link 
Iterator#next()}.
+ */
+public class RangeUnionIterator<K extends Comparable<K>, D extends 
CombinedValue<K>> extends RangeIterator<K, D>
+{
+    private final PriorityQueue<RangeIterator<K, D>> ranges;
+
+    private RangeUnionIterator(Builder.Statistics<K, D> statistics, 
PriorityQueue<RangeIterator<K, D>> ranges)
+    {
+        super(statistics);
+        this.ranges = ranges;
+    }
+
+    public D computeNext()
+    {
+        RangeIterator<K, D> head = null;
+
+        while (!ranges.isEmpty())
+        {
+            head = ranges.poll();
+            if (head.hasNext())
+                break;
+
+            FileUtils.closeQuietly(head);
+        }
+
+        if (head == null || !head.hasNext())
+            return endOfData();
+
+        D candidate = head.next();
+
+        List<RangeIterator<K, D>> processedRanges = new ArrayList<>();
+
+        if (head.hasNext())
+            processedRanges.add(head);
+        else
+            FileUtils.closeQuietly(head);
+
+        while (!ranges.isEmpty())
+        {
+            // peek here instead of poll is an optimization
+            // so we can re-insert less ranges back if candidate
+            // is less than head of the current range.
+            RangeIterator<K, D> range = ranges.peek();
+
+            int cmp = candidate.get().compareTo(range.getCurrent());
+
+            assert cmp <= 0;
+
+            if (cmp < 0)
+            {
+                break; // candidate is smaller than next token, return 
immediately
+            }
+            else if (cmp == 0)
+            {
+                candidate.merge(range.next()); // consume and merge
+
+                range = ranges.poll();
+                // re-prioritize changed range
+
+                if (range.hasNext())
+                    processedRanges.add(range);
+                else
+                    FileUtils.closeQuietly(range);
+            }
+        }
+
+        ranges.addAll(processedRanges);
+        return candidate;
+    }
+
+    protected void performSkipTo(K nextToken)
+    {
+        List<RangeIterator<K, D>> changedRanges = new ArrayList<>();
+
+        while (!ranges.isEmpty())
+        {
+            if (ranges.peek().getCurrent().compareTo(nextToken) >= 0)
+                break;
+
+            RangeIterator<K, D> head = ranges.poll();
+
+            if (head.getMaximum().compareTo(nextToken) >= 0)
+            {
+                head.skipTo(nextToken);
+                changedRanges.add(head);
+                continue;
+            }
+
+            FileUtils.closeQuietly(head);
+        }
+
+        ranges.addAll(changedRanges.stream().collect(Collectors.toList()));
+    }
+
+    public void close() throws IOException
+    {
+        ranges.forEach(FileUtils::closeQuietly);
+    }
+
+    public static <K extends Comparable<K>, D extends CombinedValue<K>> 
Builder<K, D> builder()
+    {
+        return new Builder<>();
+    }
+
+    public static <K extends Comparable<K>, D extends CombinedValue<K>> 
RangeIterator<K, D> build(List<RangeIterator<K, D>> tokens)
+    {
+        return new Builder<K, D>().add(tokens).build();
+    }
+
+    public static class Builder<K extends Comparable<K>, D extends 
CombinedValue<K>> extends RangeIterator.Builder<K, D>
+    {
+        public Builder()
+        {
+            super(IteratorType.UNION);
+        }
+
+        protected RangeIterator<K, D> buildIterator()
+        {
+            return new RangeUnionIterator<>(statistics, ranges);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/TypeUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/TypeUtil.java 
b/src/java/org/apache/cassandra/index/sasi/utils/TypeUtil.java
new file mode 100644
index 0000000..8b38530
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/TypeUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.serializers.MarshalException;
+
+public class TypeUtil
+{
+    public static boolean isValid(ByteBuffer term, AbstractType<?> validator)
+    {
+        try
+        {
+            validator.validate(term);
+            return true;
+        }
+        catch (MarshalException e)
+        {
+            return false;
+        }
+    }
+
+    public static ByteBuffer tryUpcast(ByteBuffer term, AbstractType<?> 
validator)
+    {
+        if (term.remaining() == 0)
+            return null;
+
+        try
+        {
+            if (validator instanceof Int32Type && term.remaining() == 2)
+            {
+                return Int32Type.instance.decompose((int) 
term.getShort(term.position()));
+            }
+            else if (validator instanceof LongType)
+            {
+                long upcastToken;
+
+                switch (term.remaining())
+                {
+                    case 2:
+                        upcastToken = (long) term.getShort(term.position());
+                        break;
+
+                    case 4:
+                        upcastToken = (long) Int32Type.instance.compose(term);
+                        break;
+
+                    default:
+                        upcastToken = 
Long.valueOf(UTF8Type.instance.getString(term));
+                }
+
+                return LongType.instance.decompose(upcastToken);
+            }
+            else if (validator instanceof DoubleType && term.remaining() == 4)
+            {
+                return DoubleType.instance.decompose((double) 
FloatType.instance.compose(term));
+            }
+
+            // maybe it was a string after all
+            return validator.fromString(UTF8Type.instance.getString(term));
+        }
+        catch (Exception e)
+        {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractPatriciaTrie.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractPatriciaTrie.java 
b/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractPatriciaTrie.java
new file mode 100644
index 0000000..b359416
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractPatriciaTrie.java
@@ -0,0 +1,1151 @@
+/*
+ * Copyright 2005-2010 Roger Kapsi, Sam Berlin
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+/**
+ * This class is taken from https://github.com/rkapsi/patricia-trie (v0.6), 
and slightly modified
+ * to correspond to Cassandra code style, as the only Patricia Trie 
implementation,
+ * which supports pluggable key comparators (e.g. commons-collections 
PatriciaTrie (which is based
+ * on rkapsi/patricia-trie project) only supports String keys)
+ * but unfortunately is not deployed to the maven central as a downloadable 
artifact.
+ */
+
+package org.apache.cassandra.index.sasi.utils.trie;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.cassandra.index.sasi.utils.trie.Cursor.Decision;
+
+/**
+ * This class implements the base PATRICIA algorithm and everything that
+ * is related to the {@link Map} interface.
+ */
+abstract class AbstractPatriciaTrie<K, V> extends AbstractTrie<K, V>
+{
+    private static final long serialVersionUID = -2303909182832019043L;
+
+    /**
+     * The root node of the {@link Trie}. 
+     */
+    final TrieEntry<K, V> root = new TrieEntry<>(null, null, -1);
+    
+    /**
+     * Each of these fields are initialized to contain an instance of the
+     * appropriate view the first time this view is requested. The views are
+     * stateless, so there's no reason to create more than one of each.
+     */
+    private transient volatile Set<K> keySet;
+    private transient volatile Collection<V> values;
+    private transient volatile Set<Map.Entry<K,V>> entrySet;
+    
+    /**
+     * The current size of the {@link Trie}
+     */
+    private int size = 0;
+    
+    /**
+     * The number of times this {@link Trie} has been modified.
+     * It's used to detect concurrent modifications and fail-fast
+     * the {@link Iterator}s.
+     */
+    transient int modCount = 0;
+    
+    public AbstractPatriciaTrie(KeyAnalyzer<? super K> keyAnalyzer)
+    {
+        super(keyAnalyzer);
+    }
+    
+    public AbstractPatriciaTrie(KeyAnalyzer<? super K> keyAnalyzer, Map<? 
extends K, ? extends V> m)
+    {
+        super(keyAnalyzer);
+        putAll(m);
+    }
+    
+    @Override
+    public void clear()
+    {
+        root.key = null;
+        root.bitIndex = -1;
+        root.value = null;
+        
+        root.parent = null;
+        root.left = root;
+        root.right = null;
+        root.predecessor = root;
+        
+        size = 0;
+        incrementModCount();
+    }
+    
+    @Override
+    public int size()
+    {
+        return size;
+    }
+   
+    /**
+     * A helper method to increment the {@link Trie} size
+     * and the modification counter.
+     */
+    void incrementSize()
+    {
+        size++;
+        incrementModCount();
+    }
+    
+    /**
+     * A helper method to decrement the {@link Trie} size
+     * and increment the modification counter.
+     */
+    void decrementSize()
+    {
+        size--;
+        incrementModCount();
+    }
+    
+    /**
+     * A helper method to increment the modification counter.
+     */
+    private void incrementModCount()
+    {
+        ++modCount;
+    }
+    
+    @Override
+    public V put(K key, V value)
+    {
+        if (key == null)
+            throw new NullPointerException("Key cannot be null");
+        
+        int lengthInBits = lengthInBits(key);
+        
+        // The only place to store a key with a length
+        // of zero bits is the root node
+        if (lengthInBits == 0)
+        {
+            if (root.isEmpty())
+                incrementSize();
+            else
+                incrementModCount();
+
+            return root.setKeyValue(key, value);
+        }
+        
+        TrieEntry<K, V> found = getNearestEntryForKey(key);
+        if (compareKeys(key, found.key))
+        {
+            if (found.isEmpty()) // <- must be the root
+                incrementSize();
+            else
+                incrementModCount();
+
+            return found.setKeyValue(key, value);
+        }
+        
+        int bitIndex = bitIndex(key, found.key);
+        if (!Tries.isOutOfBoundsIndex(bitIndex))
+        {
+            if (Tries.isValidBitIndex(bitIndex)) // in 99.999...9% the case
+            {
+                /* NEW KEY+VALUE TUPLE */
+                TrieEntry<K, V> t = new TrieEntry<>(key, value, bitIndex);
+                addEntry(t);
+                incrementSize();
+                return null;
+            }
+            else if (Tries.isNullBitKey(bitIndex))
+            {
+                // A bits of the Key are zero. The only place to
+                // store such a Key is the root Node!
+                
+                /* NULL BIT KEY */
+                if (root.isEmpty())
+                    incrementSize();
+                else
+                    incrementModCount();
+
+                return root.setKeyValue(key, value);
+                
+            }
+            else if (Tries.isEqualBitKey(bitIndex))
+            {
+                // This is a very special and rare case.
+                
+                /* REPLACE OLD KEY+VALUE */
+                if (found != root) {
+                    incrementModCount();
+                    return found.setKeyValue(key, value);
+                }
+            }
+        }
+        
+        throw new IndexOutOfBoundsException("Failed to put: " 
+                + key + " -> " + value + ", " + bitIndex);
+    }
+    
+    /**
+     * Adds the given {@link TrieEntry} to the {@link Trie}
+     */
+    TrieEntry<K, V> addEntry(TrieEntry<K, V> entry)
+    {
+        TrieEntry<K, V> current = root.left;
+        TrieEntry<K, V> path = root;
+
+        while(true)
+        {
+            if (current.bitIndex >= entry.bitIndex || current.bitIndex <= 
path.bitIndex)
+            {
+                entry.predecessor = entry;
+                
+                if (!isBitSet(entry.key, entry.bitIndex))
+                {
+                    entry.left = entry;
+                    entry.right = current;
+                }
+                else
+                {
+                    entry.left = current;
+                    entry.right = entry;
+                }
+               
+                entry.parent = path;
+                if (current.bitIndex >= entry.bitIndex)
+                    current.parent = entry;
+                
+                // if we inserted an uplink, set the predecessor on it
+                if (current.bitIndex <= path.bitIndex)
+                    current.predecessor = entry;
+         
+                if (path == root || !isBitSet(entry.key, path.bitIndex))
+                    path.left = entry;
+                else
+                    path.right = entry;
+                
+                return entry;
+            }
+                
+            path = current;
+            
+            current = !isBitSet(entry.key, current.bitIndex)
+                       ? current.left : current.right;
+        }
+    }
+    
+    @Override
+    public V get(Object k)
+    {
+        TrieEntry<K, V> entry = getEntry(k);
+        return entry != null ? entry.getValue() : null;
+    }
+
+    /**
+     * Returns the entry associated with the specified key in the
+     * AbstractPatriciaTrie.  Returns null if the map contains no mapping
+     * for this key.
+     * 
+     * This may throw ClassCastException if the object is not of type K.
+     */
+    TrieEntry<K,V> getEntry(Object k)
+    {
+        K key = Tries.cast(k);
+        if (key == null)
+            return null;
+        
+        TrieEntry<K,V> entry = getNearestEntryForKey(key);
+        return !entry.isEmpty() && compareKeys(key, entry.key) ? entry : null;
+    }
+    
+    @Override
+    public Map.Entry<K, V> select(K key)
+    {
+        Reference<Map.Entry<K, V>> reference = new Reference<>();
+        return !selectR(root.left, -1, key, reference) ? reference.get() : 
null;
+    }
+    
+    @Override
+    public Map.Entry<K,V> select(K key, Cursor<? super K, ? super V> cursor)
+    {
+        Reference<Map.Entry<K, V>> reference = new Reference<>();
+        selectR(root.left, -1, key, cursor, reference);
+        return reference.get();
+    }
+
+    /**
+     * This is equivalent to the other {@link #selectR(TrieEntry, int,
+     * K, Cursor, Reference)} method but without its overhead
+     * because we're selecting only one best matching Entry from the
+     * {@link Trie}.
+     */
+    private boolean selectR(TrieEntry<K, V> h, int bitIndex, final K key, 
final Reference<Map.Entry<K, V>> reference)
+    {
+        if (h.bitIndex <= bitIndex)
+        {
+            // If we hit the root Node and it is empty
+            // we have to look for an alternative best
+            // matching node.
+            if (!h.isEmpty())
+            {
+                reference.set(h);
+                return false;
+            }
+            return true;
+        }
+
+        if (!isBitSet(key, h.bitIndex))
+        {
+            if (selectR(h.left, h.bitIndex, key, reference))
+            {
+                return selectR(h.right, h.bitIndex, key, reference);
+            }
+        }
+        else
+        {
+            if (selectR(h.right, h.bitIndex, key, reference))
+            {
+                return selectR(h.left, h.bitIndex, key, reference);
+            }
+        }
+
+        return false;
+    }
+    
+    /**
+     * 
+     */
+    private boolean selectR(TrieEntry<K,V> h, int bitIndex, 
+                            final K key, final Cursor<? super K, ? super V> 
cursor,
+                            final Reference<Map.Entry<K, V>> reference)
+    {
+        if (h.bitIndex <= bitIndex)
+        {
+            if (!h.isEmpty())
+            {
+                Decision decision = cursor.select(h);
+                switch(decision)
+                {
+                    case REMOVE:
+                        throw new UnsupportedOperationException("Cannot remove 
during select");
+
+                    case EXIT:
+                        reference.set(h);
+                        return false; // exit
+
+                    case REMOVE_AND_EXIT:
+                        TrieEntry<K, V> entry = new TrieEntry<>(h.getKey(), 
h.getValue(), -1);
+                        reference.set(entry);
+                        removeEntry(h);
+                        return false;
+
+                    case CONTINUE:
+                        // fall through.
+                }
+            }
+
+            return true; // continue
+        }
+
+        if (!isBitSet(key, h.bitIndex))
+        {
+            if (selectR(h.left, h.bitIndex, key, cursor, reference))
+            {
+                return selectR(h.right, h.bitIndex, key, cursor, reference);
+            }
+        }
+        else
+        {
+            if (selectR(h.right, h.bitIndex, key, cursor, reference))
+            {
+                return selectR(h.left, h.bitIndex, key, cursor, reference);
+            }
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public Map.Entry<K, V> traverse(Cursor<? super K, ? super V> cursor)
+    {
+        TrieEntry<K, V> entry = nextEntry(null);
+        while (entry != null)
+        {
+            TrieEntry<K, V> current = entry;
+            
+            Decision decision = cursor.select(current);
+            entry = nextEntry(current);
+            
+            switch(decision)
+            {
+                case EXIT:
+                    return current;
+
+                case REMOVE:
+                    removeEntry(current);
+                    break; // out of switch, stay in while loop
+
+                case REMOVE_AND_EXIT:
+                    Map.Entry<K, V> value = new TrieEntry<>(current.getKey(), 
current.getValue(), -1);
+                    removeEntry(current);
+                    return value;
+
+                case CONTINUE: // do nothing.
+            }
+        }
+        
+        return null;
+    }
+    
+    @Override
+    public boolean containsKey(Object k)
+    {
+        if (k == null)
+            return false;
+        
+        K key = Tries.cast(k);
+        TrieEntry<K, V> entry = getNearestEntryForKey(key);
+        return !entry.isEmpty() && compareKeys(key, entry.key);
+    }
+    
+    @Override
+    public Set<Map.Entry<K,V>> entrySet()
+    {
+        if (entrySet == null)
+            entrySet = new EntrySet();
+
+        return entrySet;
+    }
+    
+    @Override
+    public Set<K> keySet()
+    {
+        if (keySet == null)
+            keySet = new KeySet();
+        return keySet;
+    }
+    
+    @Override
+    public Collection<V> values()
+    {
+        if (values == null)
+            values = new Values();
+        return values;
+    }
+    
+    /**
+     * {@inheritDoc}
+     * 
+     * @throws ClassCastException if provided key is of an incompatible type 
+     */
+    @Override
+    public V remove(Object k)
+    {
+        if (k == null)
+            return null;
+        
+        K key = Tries.cast(k);
+        TrieEntry<K, V> current = root.left;
+        TrieEntry<K, V> path = root;
+        while (true)
+        {
+            if (current.bitIndex <= path.bitIndex)
+            {
+                if (!current.isEmpty() && compareKeys(key, current.key))
+                {
+                    return removeEntry(current);
+                }
+                else
+                {
+                    return null;
+                }
+            }
+            
+            path = current;
+            current = !isBitSet(key, current.bitIndex) ? current.left : 
current.right;
+        }
+    }
+    
+    /**
+     * Returns the nearest entry for a given key.  This is useful
+     * for finding knowing if a given key exists (and finding the value
+     * for it), or for inserting the key.
+     * 
+     * The actual get implementation. This is very similar to
+     * selectR but with the exception that it might return the
+     * root Entry even if it's empty.
+     */
+    TrieEntry<K, V> getNearestEntryForKey(K key)
+    {
+        TrieEntry<K, V> current = root.left;
+        TrieEntry<K, V> path = root;
+
+        while(true)
+        {
+            if (current.bitIndex <= path.bitIndex)
+                return current;
+            
+            path = current;
+            current = !isBitSet(key, current.bitIndex) ? current.left : 
current.right;
+        }
+    }
+    
+    /**
+     * Removes a single entry from the {@link Trie}.
+     * 
+     * If we found a Key (Entry h) then figure out if it's
+     * an internal (hard to remove) or external Entry (easy 
+     * to remove)
+     */
+    V removeEntry(TrieEntry<K, V> h)
+    {
+        if (h != root)
+        {
+            if (h.isInternalNode())
+            {
+                removeInternalEntry(h);
+            }
+            else
+            {
+                removeExternalEntry(h);
+            }
+        }
+        
+        decrementSize();
+        return h.setKeyValue(null, null);
+    }
+    
+    /**
+     * Removes an external entry from the {@link Trie}.
+     * 
+     * If it's an external Entry then just remove it.
+     * This is very easy and straight forward.
+     */
+    private void removeExternalEntry(TrieEntry<K, V> h)
+    {
+        if (h == root)
+        {
+            throw new IllegalArgumentException("Cannot delete root Entry!");
+        }
+        else if (!h.isExternalNode())
+        {
+            throw new IllegalArgumentException(h + " is not an external 
Entry!");
+        } 
+        
+        TrieEntry<K, V> parent = h.parent;
+        TrieEntry<K, V> child = (h.left == h) ? h.right : h.left;
+        
+        if (parent.left == h)
+        {
+            parent.left = child;
+        }
+        else
+        {
+            parent.right = child;
+        }
+        
+        // either the parent is changing, or the predecessor is changing.
+        if (child.bitIndex > parent.bitIndex)
+        {
+            child.parent = parent;
+        }
+        else
+        {
+            child.predecessor = parent;
+        }
+        
+    }
+    
+    /**
+     * Removes an internal entry from the {@link Trie}.
+     * 
+     * If it's an internal Entry then "good luck" with understanding
+     * this code. The Idea is essentially that Entry p takes Entry h's
+     * place in the trie which requires some re-wiring.
+     */
+    private void removeInternalEntry(TrieEntry<K, V> h)
+    {
+        if (h == root)
+        {
+            throw new IllegalArgumentException("Cannot delete root Entry!");
+        }
+        else if (!h.isInternalNode())
+        {
+            throw new IllegalArgumentException(h + " is not an internal 
Entry!");
+        } 
+        
+        TrieEntry<K, V> p = h.predecessor;
+        
+        // Set P's bitIndex
+        p.bitIndex = h.bitIndex;
+        
+        // Fix P's parent, predecessor and child Nodes
+        {
+            TrieEntry<K, V> parent = p.parent;
+            TrieEntry<K, V> child = (p.left == h) ? p.right : p.left;
+            
+            // if it was looping to itself previously,
+            // it will now be pointed from it's parent
+            // (if we aren't removing it's parent --
+            //  in that case, it remains looping to itself).
+            // otherwise, it will continue to have the same
+            // predecessor.
+            if (p.predecessor == p && p.parent != h)
+                p.predecessor = p.parent;
+            
+            if (parent.left == p)
+            {
+                parent.left = child;
+            }
+            else
+            {
+                parent.right = child;
+            }
+            
+            if (child.bitIndex > parent.bitIndex)
+            {
+                child.parent = parent;
+            }
+        }
+        
+        // Fix H's parent and child Nodes
+        {         
+            // If H is a parent of its left and right child 
+            // then change them to P
+            if (h.left.parent == h)
+                h.left.parent = p;
+
+            if (h.right.parent == h)
+                h.right.parent = p;
+            
+            // Change H's parent
+            if (h.parent.left == h)
+            {
+                h.parent.left = p;
+            }
+            else
+            {
+                h.parent.right = p;
+            }
+        }
+        
+        // Copy the remaining fields from H to P
+        //p.bitIndex = h.bitIndex;
+        p.parent = h.parent;
+        p.left = h.left;
+        p.right = h.right;
+        
+        // Make sure that if h was pointing to any uplinks,
+        // p now points to them.
+        if (isValidUplink(p.left, p))
+            p.left.predecessor = p;
+        
+        if (isValidUplink(p.right, p))
+            p.right.predecessor = p;
+    }
+    
+    /**
+     * Returns the entry lexicographically after the given entry.
+     * If the given entry is null, returns the first node.
+     */
+    TrieEntry<K, V> nextEntry(TrieEntry<K, V> node)
+    {
+        return (node == null) ? firstEntry() : nextEntryImpl(node.predecessor, 
node, null);
+    }
+    
+    /**
+     * Scans for the next node, starting at the specified point, and using 
'previous'
+     * as a hint that the last node we returned was 'previous' (so we know not 
to return
+     * it again).  If 'tree' is non-null, this will limit the search to the 
given tree.
+     * 
+     * The basic premise is that each iteration can follow the following steps:
+     * 
+     * 1) Scan all the way to the left.
+     *   a) If we already started from this node last time, proceed to Step 2.
+     *   b) If a valid uplink is found, use it.
+     *   c) If the result is an empty node (root not set), break the scan.
+     *   d) If we already returned the left node, break the scan.
+     *   
+     * 2) Check the right.
+     *   a) If we already returned the right node, proceed to Step 3.
+     *   b) If it is a valid uplink, use it.
+     *   c) Do Step 1 from the right node.
+     *   
+     * 3) Back up through the parents until we encounter find a parent
+     *    that we're not the right child of.
+     *    
+     * 4) If there's no right child of that parent, the iteration is finished.
+     *    Otherwise continue to Step 5.
+     * 
+     * 5) Check to see if the right child is a valid uplink.
+     *    a) If we already returned that child, proceed to Step 6.
+     *       Otherwise, use it.
+     *    
+     * 6) If the right child of the parent is the parent itself, we've
+     *    already found & returned the end of the Trie, so exit.
+     *    
+     * 7) Do Step 1 on the parent's right child.
+     */
+    TrieEntry<K, V> nextEntryImpl(TrieEntry<K, V> start, TrieEntry<K, V> 
previous, TrieEntry<K, V> tree)
+    {
+        TrieEntry<K, V> current = start;
+
+        // Only look at the left if this was a recursive or
+        // the first check, otherwise we know we've already looked
+        // at the left.
+        if (previous == null || start != previous.predecessor)
+        {
+            while (!current.left.isEmpty())
+            {
+                // stop traversing if we've already
+                // returned the left of this node.
+                if (previous == current.left)
+                    break;
+                
+                if (isValidUplink(current.left, current))
+                    return current.left;
+                
+                current = current.left;
+            }
+        }
+        
+        // If there's no data at all, exit.
+        if (current.isEmpty())
+            return null;
+        
+        // If we've already returned the left,
+        // and the immediate right is null,
+        // there's only one entry in the Trie
+        // which is stored at the root.
+        //
+        //  / ("")   <-- root
+        //  \_/  \
+        //       null <-- 'current'
+        //
+        if (current.right == null)
+            return null;
+        
+        // If nothing valid on the left, try the right.
+        if (previous != current.right)
+        {
+            // See if it immediately is valid.
+            if (isValidUplink(current.right, current))
+                return current.right;
+            
+            // Must search on the right's side if it wasn't initially valid.
+            return nextEntryImpl(current.right, previous, tree);
+        }
+        
+        // Neither left nor right are valid, find the first parent
+        // whose child did not come from the right & traverse it.
+        while (current == current.parent.right)
+        {
+            // If we're going to traverse to above the subtree, stop.
+            if (current == tree)
+                return null;
+            
+            current = current.parent;
+        }
+
+        // If we're on the top of the subtree, we can't go any higher.
+        if (current == tree)
+            return null;
+        
+        // If there's no right, the parent must be root, so we're done.
+        if (current.parent.right == null)
+            return null;
+        
+        // If the parent's right points to itself, we've found one.
+        if (previous != current.parent.right && 
isValidUplink(current.parent.right, current.parent))
+            return current.parent.right;
+        
+        // If the parent's right is itself, there can't be any more nodes.
+        if (current.parent.right == current.parent)
+            return null;
+        
+        // We need to traverse down the parent's right's path.
+        return nextEntryImpl(current.parent.right, previous, tree);
+    }
+    
+    /**
+     * Returns the first entry the {@link Trie} is storing.
+     * 
+     * This is implemented by going always to the left until
+     * we encounter a valid uplink. That uplink is the first key.
+     */
+    TrieEntry<K, V> firstEntry()
+    {
+        // if Trie is empty, no first node.
+        return isEmpty() ? null : followLeft(root);
+    }
+    
+    /** 
+     * Goes left through the tree until it finds a valid node. 
+     */
+    TrieEntry<K, V> followLeft(TrieEntry<K, V> node)
+    {
+        while(true)
+        {
+            TrieEntry<K, V> child = node.left;
+            // if we hit root and it didn't have a node, go right instead.
+            if (child.isEmpty())
+                child = node.right;
+            
+            if (child.bitIndex <= node.bitIndex)
+                return child;
+            
+            node = child;
+        }
+    }
+    
+    /** 
+     * Returns true if 'next' is a valid uplink coming from 'from'. 
+     */
+    static boolean isValidUplink(TrieEntry<?, ?> next, TrieEntry<?, ?> from)
+    {
+        return next != null && next.bitIndex <= from.bitIndex && 
!next.isEmpty();
+    }
+    
+    /**
+     * A {@link Reference} allows us to return something through a Method's 
+     * argument list. An alternative would be to an Array with a length of 
+     * one (1) but that leads to compiler warnings. Computationally and memory
+     * wise there's no difference (except for the need to load the 
+     * {@link Reference} Class but that happens only once).
+     */
+    private static class Reference<E>
+    {
+        
+        private E item;
+        
+        public void set(E item)
+        {
+            this.item = item;
+        }
+        
+        public E get()
+        {
+            return item;
+        }
+    }
+    
+    /**
+     *  A {@link Trie} is a set of {@link TrieEntry} nodes
+     */
+    static class TrieEntry<K,V> extends BasicEntry<K, V>
+    {
+        
+        private static final long serialVersionUID = 4596023148184140013L;
+        
+        /** The index this entry is comparing. */
+        protected int bitIndex;
+        
+        /** The parent of this entry. */
+        protected TrieEntry<K,V> parent;
+        
+        /** The left child of this entry. */
+        protected TrieEntry<K,V> left;
+        
+        /** The right child of this entry. */
+        protected TrieEntry<K,V> right;
+        
+        /** The entry who uplinks to this entry. */ 
+        protected TrieEntry<K,V> predecessor;
+        
+        public TrieEntry(K key, V value, int bitIndex)
+        {
+            super(key, value);
+            
+            this.bitIndex = bitIndex;
+            
+            this.parent = null;
+            this.left = this;
+            this.right = null;
+            this.predecessor = this;
+        }
+        
+        /**
+         * Whether or not the entry is storing a key.
+         * Only the root can potentially be empty, all other
+         * nodes must have a key.
+         */
+        public boolean isEmpty()
+        {
+            return key == null;
+        }
+        
+        /** 
+         * Neither the left nor right child is a loopback 
+         */
+        public boolean isInternalNode()
+        {
+            return left != this && right != this;
+        }
+        
+        /** 
+         * Either the left or right child is a loopback 
+         */
+        public boolean isExternalNode()
+        {
+            return !isInternalNode();
+        }
+    }
+    
+
+    /**
+     * This is a entry set view of the {@link Trie} as returned 
+     * by {@link Map#entrySet()}
+     */
+    private class EntrySet extends AbstractSet<Map.Entry<K,V>>
+    {
+        @Override
+        public Iterator<Map.Entry<K,V>> iterator()
+        {
+            return new EntryIterator();
+        }
+        
+        @Override
+        public boolean contains(Object o)
+        {
+            if (!(o instanceof Map.Entry))
+                return false;
+            
+            TrieEntry<K,V> candidate = getEntry(((Map.Entry<?, ?>)o).getKey());
+            return candidate != null && candidate.equals(o);
+        }
+        
+        @Override
+        public boolean remove(Object o)
+        {
+            int size = size();
+            AbstractPatriciaTrie.this.remove(o);
+            return size != size();
+        }
+        
+        @Override
+        public int size()
+        {
+            return AbstractPatriciaTrie.this.size();
+        }
+        
+        @Override
+        public void clear()
+        {
+            AbstractPatriciaTrie.this.clear();
+        }
+        
+        /**
+         * An {@link Iterator} that returns {@link Entry} Objects
+         */
+        private class EntryIterator extends TrieIterator<Map.Entry<K,V>>
+        {
+            @Override
+            public Map.Entry<K,V> next()
+            {
+                return nextEntry();
+            }
+        }
+    }
+    
+    /**
+     * This is a key set view of the {@link Trie} as returned 
+     * by {@link Map#keySet()}
+     */
+    private class KeySet extends AbstractSet<K>
+    {
+        @Override
+        public Iterator<K> iterator()
+        {
+            return new KeyIterator();
+        }
+        
+        @Override
+        public int size()
+        {
+            return AbstractPatriciaTrie.this.size();
+        }
+        
+        @Override
+        public boolean contains(Object o)
+        {
+            return containsKey(o);
+        }
+        
+        @Override
+        public boolean remove(Object o)
+        {
+            int size = size();
+            AbstractPatriciaTrie.this.remove(o);
+            return size != size();
+        }
+        
+        @Override
+        public void clear()
+        {
+            AbstractPatriciaTrie.this.clear();
+        }
+        
+        /**
+         * An {@link Iterator} that returns Key Objects
+         */
+        private class KeyIterator extends TrieIterator<K>
+        {
+            @Override
+            public K next()
+            {
+                return nextEntry().getKey();
+            }
+        }
+    }
+    
+    /**
+     * This is a value view of the {@link Trie} as returned 
+     * by {@link Map#values()}
+     */
+    private class Values extends AbstractCollection<V>
+    {
+        @Override
+        public Iterator<V> iterator()
+        {
+            return new ValueIterator();
+        }
+        
+        @Override
+        public int size()
+        {
+            return AbstractPatriciaTrie.this.size();
+        }
+        
+        @Override
+        public boolean contains(Object o)
+        {
+            return containsValue(o);
+        }
+        
+        @Override
+        public void clear()
+        {
+            AbstractPatriciaTrie.this.clear();
+        }
+        
+        @Override
+        public boolean remove(Object o)
+        {
+            for (Iterator<V> it = iterator(); it.hasNext(); )
+            {
+                V value = it.next();
+                if (Tries.areEqual(value, o))
+                {
+                    it.remove();
+                    return true;
+                }
+            }
+            return false;
+        }
+        
+        /**
+         * An {@link Iterator} that returns Value Objects
+         */
+        private class ValueIterator extends TrieIterator<V>
+        {
+            @Override
+            public V next()
+            {
+                return nextEntry().getValue();
+            }
+        }
+    }
+    
+    /** 
+     * An iterator for the entries. 
+     */
+    abstract class TrieIterator<E> implements Iterator<E>
+    {
+        /**
+         * For fast-fail
+         */
+        protected int expectedModCount = AbstractPatriciaTrie.this.modCount;
+        
+        protected TrieEntry<K, V> next; // the next node to return
+        protected TrieEntry<K, V> current; // the current entry we're on
+        
+        /**
+         * Starts iteration from the root
+         */
+        protected TrieIterator()
+        {
+            next = AbstractPatriciaTrie.this.nextEntry(null);
+        }
+        
+        /**
+         * Starts iteration at the given entry
+         */
+        protected TrieIterator(TrieEntry<K, V> firstEntry)
+        {
+            next = firstEntry;
+        }
+        
+        /**
+         * Returns the next {@link TrieEntry}
+         */
+        protected TrieEntry<K,V> nextEntry()
+        {
+            if (expectedModCount != AbstractPatriciaTrie.this.modCount)
+                throw new ConcurrentModificationException();
+            
+            TrieEntry<K,V> e = next;
+            if (e == null)
+                throw new NoSuchElementException();
+            
+            next = findNext(e);
+            current = e;
+            return e;
+        }
+        
+        /**
+         * @see PatriciaTrie#nextEntry(TrieEntry)
+         */
+        protected TrieEntry<K, V> findNext(TrieEntry<K, V> prior)
+        {
+            return AbstractPatriciaTrie.this.nextEntry(prior);
+        }
+        
+        @Override
+        public boolean hasNext()
+        {
+            return next != null;
+        }
+        
+        @Override
+        public void remove()
+        {
+            if (current == null)
+                throw new IllegalStateException();
+            
+            if (expectedModCount != AbstractPatriciaTrie.this.modCount)
+                throw new ConcurrentModificationException();
+            
+            TrieEntry<K, V> node = current;
+            current = null;
+            AbstractPatriciaTrie.this.removeEntry(node);
+            
+            expectedModCount = AbstractPatriciaTrie.this.modCount;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractTrie.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractTrie.java 
b/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractTrie.java
new file mode 100644
index 0000000..0bf9c20
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/trie/AbstractTrie.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2005-2010 Roger Kapsi, Sam Berlin
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.utils.trie;
+
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Map;
+
+/**
+ * This class is taken from https://github.com/rkapsi/patricia-trie (v0.6), 
and slightly modified
+ * to correspond to Cassandra code style, as the only Patricia Trie 
implementation,
+ * which supports pluggable key comparators (e.g. commons-collections 
PatriciaTrie (which is based
+ * on rkapsi/patricia-trie project) only supports String keys)
+ * but unfortunately is not deployed to the maven central as a downloadable 
artifact.
+ */
+
+/**
+ * This class provides some basic {@link Trie} functionality and 
+ * utility methods for actual {@link Trie} implementations.
+ */
+abstract class AbstractTrie<K, V> extends AbstractMap<K, V> implements 
Serializable, Trie<K, V>
+{
+    private static final long serialVersionUID = -6358111100045408883L;
+    
+    /**
+     * The {@link KeyAnalyzer} that's being used to build the 
+     * PATRICIA {@link Trie}
+     */
+    protected final KeyAnalyzer<? super K> keyAnalyzer;
+    
+    /** 
+     * Constructs a new {@link Trie} using the given {@link KeyAnalyzer} 
+     */
+    public AbstractTrie(KeyAnalyzer<? super K> keyAnalyzer)
+    {
+        this.keyAnalyzer = Tries.notNull(keyAnalyzer, "keyAnalyzer");
+    }
+    
+    @Override
+    public K selectKey(K key)
+    {
+        Map.Entry<K, V> entry = select(key);
+        return entry != null ? entry.getKey() : null;
+    }
+    
+    @Override
+    public V selectValue(K key)
+    {
+        Map.Entry<K, V> entry = select(key);
+        return entry != null ? entry.getValue() : null;
+    }
+        
+    @Override
+    public String toString()
+    {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append("Trie[").append(size()).append("]={\n");
+        for (Map.Entry<K, V> entry : entrySet()) {
+            buffer.append("  ").append(entry).append("\n");
+        }
+        buffer.append("}\n");
+        return buffer.toString();
+    }
+    
+    /**
+     * Returns the length of the given key in bits
+     * 
+     * @see KeyAnalyzer#lengthInBits(Object)
+     */
+    final int lengthInBits(K key)
+    {
+        return key == null ? 0 : keyAnalyzer.lengthInBits(key);
+    }
+    
+    /**
+     * Returns whether or not the given bit on the 
+     * key is set or false if the key is null.
+     * 
+     * @see KeyAnalyzer#isBitSet(Object, int)
+     */
+    final boolean isBitSet(K key, int bitIndex)
+    {
+        return key != null && keyAnalyzer.isBitSet(key, bitIndex);
+    }
+    
+    /**
+     * Utility method for calling {@link KeyAnalyzer#bitIndex(Object, Object)}
+     */
+    final int bitIndex(K key, K otherKey)
+    {
+        if (key != null && otherKey != null)
+        {
+            return keyAnalyzer.bitIndex(key, otherKey);            
+        }
+        else if (key != null)
+        {
+            return bitIndex(key);
+        }
+        else if (otherKey != null)
+        {
+            return bitIndex(otherKey);
+        }
+        
+        return KeyAnalyzer.NULL_BIT_KEY;
+    }
+    
+    private int bitIndex(K key)
+    {
+        int lengthInBits = lengthInBits(key);
+        for (int i = 0; i < lengthInBits; i++)
+        {
+            if (isBitSet(key, i))
+                return i;
+        }
+        
+        return KeyAnalyzer.NULL_BIT_KEY;
+    }
+    
+    /**
+     * An utility method for calling {@link KeyAnalyzer#compare(Object, 
Object)}
+     */
+    final boolean compareKeys(K key, K other)
+    {
+        if (key == null)
+        {
+            return (other == null);
+        }
+        else if (other == null)
+        {
+            return false;
+        }
+        
+        return keyAnalyzer.compare(key, other) == 0;
+    }
+    
+    /**
+     * A basic implementation of {@link Entry}
+     */
+    abstract static class BasicEntry<K, V> implements Map.Entry<K, V>, 
Serializable
+    {
+        private static final long serialVersionUID = -944364551314110330L;
+
+        protected K key;
+        
+        protected V value;
+        
+        private transient int hashCode = 0;
+        
+        public BasicEntry(K key, V value)
+        {
+            this.key = key;
+            this.value = value;
+        }
+        
+        /**
+         * Replaces the current key and value with the provided
+         * key &amp; value
+         */
+        public V setKeyValue(K key, V value)
+        {
+            this.key = key;
+            this.hashCode = 0;
+            return setValue(value);
+        }
+        
+        @Override
+        public K getKey()
+        {
+            return key;
+        }
+        
+        @Override
+        public V getValue()
+        {
+            return value;
+        }
+        
+        @Override
+        public V setValue(V value)
+        {
+            V previous = this.value;
+            this.value = value;
+            return previous;
+        }
+        
+        @Override
+        public int hashCode()
+        {
+            if (hashCode == 0)
+                hashCode = (key != null ? key.hashCode() : 0);
+            return hashCode;
+        }
+        
+        @Override
+        public boolean equals(Object o)
+        {
+            if (o == this)
+            {
+                return true;
+            }
+            else if (!(o instanceof Map.Entry<?, ?>))
+            {
+                return false;
+            }
+            
+            Map.Entry<?, ?> other = (Map.Entry<?, ?>)o;
+            return Tries.areEqual(key, other.getKey()) && 
Tries.areEqual(value, other.getValue());
+        }
+        
+        @Override
+        public String toString()
+        {
+            return key + "=" + value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/trie/Cursor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/trie/Cursor.java 
b/src/java/org/apache/cassandra/index/sasi/utils/trie/Cursor.java
new file mode 100644
index 0000000..513fae0
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/trie/Cursor.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2005-2010 Roger Kapsi, Sam Berlin
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.utils.trie;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This class is taken from https://github.com/rkapsi/patricia-trie (v0.6), 
and slightly modified
+ * to correspond to Cassandra code style, as the only Patricia Trie 
implementation,
+ * which supports pluggable key comparators (e.g. commons-collections 
PatriciaTrie (which is based
+ * on rkapsi/patricia-trie project) only supports String keys)
+ * but unfortunately is not deployed to the maven central as a downloadable 
artifact.
+ */
+
+/**
+ * A {@link Cursor} can be used to traverse a {@link Trie}, visit each node 
+ * step by step and make {@link Decision}s on each step how to continue with 
+ * traversing the {@link Trie}.
+ */
+public interface Cursor<K, V>
+{
+    
+    /**
+     * The {@link Decision} tells the {@link Cursor} what to do on each step 
+     * while traversing the {@link Trie}.
+     * 
+     * NOTE: Not all operations that work with a {@link Cursor} support all 
+     * {@link Decision} types
+     */
+    enum Decision
+    {
+        
+        /**
+         * Exit the traverse operation
+         */
+        EXIT, 
+        
+        /**
+         * Continue with the traverse operation
+         */
+        CONTINUE, 
+        
+        /**
+         * Remove the previously returned element
+         * from the {@link Trie} and continue
+         */
+        REMOVE, 
+        
+        /**
+         * Remove the previously returned element
+         * from the {@link Trie} and exit from the
+         * traverse operation
+         */
+        REMOVE_AND_EXIT
+    }
+    
+    /**
+     * Called for each {@link Entry} in the {@link Trie}. Return 
+     * {@link Decision#EXIT} to finish the {@link Trie} operation,
+     * {@link Decision#CONTINUE} to go to the next {@link Entry},
+     * {@link Decision#REMOVE} to remove the {@link Entry} and
+     * continue iterating or {@link Decision#REMOVE_AND_EXIT} to
+     * remove the {@link Entry} and stop iterating.
+     * 
+     * Note: Not all operations support {@link Decision#REMOVE}.
+     */
+    Decision select(Map.Entry<? extends K, ? extends V> entry);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/trie/KeyAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/sasi/utils/trie/KeyAnalyzer.java 
b/src/java/org/apache/cassandra/index/sasi/utils/trie/KeyAnalyzer.java
new file mode 100644
index 0000000..9cab4ae
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/trie/KeyAnalyzer.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Roger Kapsi
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.utils.trie;
+
+import java.util.Comparator;
+
+/**
+ * This class is taken from https://github.com/rkapsi/patricia-trie (v0.6), 
and slightly modified
+ * to correspond to Cassandra code style, as the only Patricia Trie 
implementation,
+ * which supports pluggable key comparators (e.g. commons-collections 
PatriciaTrie (which is based
+ * on rkapsi/patricia-trie project) only supports String keys)
+ * but unfortunately is not deployed to the maven central as a downloadable 
artifact.
+ */
+
+/**
+ * The {@link KeyAnalyzer} provides bit-level access to keys
+ * for the {@link PatriciaTrie}.
+ */
+public interface KeyAnalyzer<K> extends Comparator<K>
+{
+    /**
+     * Returned by {@link #bitIndex(Object, Object)} if a key's
+     * bits were all zero (0).
+     */
+    int NULL_BIT_KEY = -1;
+    
+    /** 
+     * Returned by {@link #bitIndex(Object, Object)} if a the
+     * bits of two keys were all equal.
+     */
+    int EQUAL_BIT_KEY = -2;
+    
+    /**
+     * Returned by {@link #bitIndex(Object, Object)} if a keys 
+     * indices are out of bounds.
+     */
+    int OUT_OF_BOUNDS_BIT_KEY = -3;
+    
+    /**
+     * Returns the key's length in bits.
+     */
+    int lengthInBits(K key);
+    
+    /**
+     * Returns {@code true} if a key's bit it set at the given index.
+     */
+    boolean isBitSet(K key, int bitIndex);
+    
+    /**
+     * Returns the index of the first bit that is different in the two keys.
+     */
+    int bitIndex(K key, K otherKey);
+    
+    /**
+     * Returns {@code true} if the second argument is a 
+     * prefix of the first argument.
+     */
+    boolean isPrefix(K key, K prefix);
+}

Reply via email to