http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/analyzer/NonTokenizingAnalyzerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/analyzer/NonTokenizingAnalyzerTest.java
 
b/test/unit/org/apache/cassandra/index/sasi/analyzer/NonTokenizingAnalyzerTest.java
new file mode 100644
index 0000000..ba67853
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/index/sasi/analyzer/NonTokenizingAnalyzerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.analyzer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the non-tokenizing analyzer
+ */
+public class NonTokenizingAnalyzerTest
+{
+    @Test
+    public void caseInsensitiveAnalizer() throws Exception
+    {
+        NonTokenizingAnalyzer analyzer = new NonTokenizingAnalyzer();
+        NonTokenizingOptions options = 
NonTokenizingOptions.getDefaultOptions();
+        options.setCaseSensitive(false);
+        analyzer.init(options, UTF8Type.instance);
+
+        String testString = "Nip it in the bud";
+        ByteBuffer toAnalyze = ByteBuffer.wrap(testString.getBytes());
+        analyzer.reset(toAnalyze);
+        ByteBuffer analyzed = null;
+        while (analyzer.hasNext())
+            analyzed = analyzer.next();
+        
Assert.assertTrue(testString.toLowerCase().equals(ByteBufferUtil.string(analyzed)));
+    }
+
+    @Test
+    public void caseSensitiveAnalizer() throws Exception
+    {
+        NonTokenizingAnalyzer analyzer = new NonTokenizingAnalyzer();
+        NonTokenizingOptions options = 
NonTokenizingOptions.getDefaultOptions();
+        analyzer.init(options, UTF8Type.instance);
+
+        String testString = "Nip it in the bud";
+        ByteBuffer toAnalyze = ByteBuffer.wrap(testString.getBytes());
+        analyzer.reset(toAnalyze);
+        ByteBuffer analyzed = null;
+        while (analyzer.hasNext())
+            analyzed = analyzer.next();
+        
Assert.assertFalse(testString.toLowerCase().equals(ByteBufferUtil.string(analyzed)));
+    }
+
+    @Test
+    public void ensureIncompatibleInputSkipped() throws Exception
+    {
+        NonTokenizingAnalyzer analyzer = new NonTokenizingAnalyzer();
+        NonTokenizingOptions options = 
NonTokenizingOptions.getDefaultOptions();
+        analyzer.init(options, Int32Type.instance);
+
+        ByteBuffer toAnalyze = ByteBufferUtil.bytes(1);
+        analyzer.reset(toAnalyze);
+        Assert.assertTrue(!analyzer.hasNext());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/analyzer/StandardAnalyzerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/analyzer/StandardAnalyzerTest.java 
b/test/unit/org/apache/cassandra/index/sasi/analyzer/StandardAnalyzerTest.java
new file mode 100644
index 0000000..e307512
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/index/sasi/analyzer/StandardAnalyzerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.analyzer;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StandardAnalyzerTest
+{
+    @Test
+    public void testTokenizationAscii() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                .getResourceAsStream("tokenization/apache_license_header.txt");
+
+        StandardTokenizerOptions options = new 
StandardTokenizerOptions.OptionsBuilder()
+                .maxTokenLength(5).build();
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(options);
+
+        List<ByteBuffer> tokens = new ArrayList<>();
+        tokenizer.reset(is);
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(67, tokens.size());
+    }
+
+    @Test
+    public void testTokenizationLoremIpsum() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                .getResourceAsStream("tokenization/lorem_ipsum.txt");
+
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(StandardTokenizerOptions.getDefaultOptions());
+
+        List<ByteBuffer> tokens = new ArrayList<>();
+        tokenizer.reset(is);
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(62, tokens.size());
+
+    }
+
+    @Test
+    public void testTokenizationJaJp1() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                .getResourceAsStream("tokenization/ja_jp_1.txt");
+
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(StandardTokenizerOptions.getDefaultOptions());
+
+        tokenizer.reset(is);
+        List<ByteBuffer> tokens = new ArrayList<>();
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(210, tokens.size());
+    }
+
+    @Test
+    public void testTokenizationJaJp2() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                .getResourceAsStream("tokenization/ja_jp_2.txt");
+
+        StandardTokenizerOptions options = new 
StandardTokenizerOptions.OptionsBuilder().stemTerms(true)
+                .ignoreStopTerms(true).alwaysLowerCaseTerms(true).build();
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(options);
+
+        tokenizer.reset(is);
+        List<ByteBuffer> tokens = new ArrayList<>();
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(57, tokens.size());
+    }
+
+    @Test
+    public void testTokenizationRuRu1() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                .getResourceAsStream("tokenization/ru_ru_1.txt");
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(StandardTokenizerOptions.getDefaultOptions());
+
+        List<ByteBuffer> tokens = new ArrayList<>();
+        tokenizer.reset(is);
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(456, tokens.size());
+    }
+
+    @Test
+    public void testTokenizationZnTw1() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                .getResourceAsStream("tokenization/zn_tw_1.txt");
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(StandardTokenizerOptions.getDefaultOptions());
+
+        List<ByteBuffer> tokens = new ArrayList<>();
+        tokenizer.reset(is);
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(963, tokens.size());
+    }
+
+    @Test
+    public void testTokenizationAdventuresOfHuckFinn() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                
.getResourceAsStream("tokenization/adventures_of_huckleberry_finn_mark_twain.txt");
+
+        StandardTokenizerOptions options = new 
StandardTokenizerOptions.OptionsBuilder().stemTerms(true)
+                .ignoreStopTerms(true).useLocale(Locale.ENGLISH)
+                .alwaysLowerCaseTerms(true).build();
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(options);
+
+        List<ByteBuffer> tokens = new ArrayList<>();
+        tokenizer.reset(is);
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(40249, tokens.size());
+    }
+
+    @Test
+    public void tokenizeDomainNamesAndUrls() throws Exception
+    {
+        InputStream is = StandardAnalyzerTest.class.getClassLoader()
+                .getResourceAsStream("tokenization/top_visited_domains.txt");
+
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(StandardTokenizerOptions.getDefaultOptions());
+        tokenizer.reset(is);
+
+        List<ByteBuffer> tokens = new ArrayList<>();
+        while (tokenizer.hasNext())
+            tokens.add(tokenizer.next());
+
+        assertEquals(15, tokens.size());
+    }
+
+    @Test
+    public void testReuseAndResetTokenizerInstance() throws Exception
+    {
+        List<ByteBuffer> bbToTokenize = new ArrayList<>();
+        bbToTokenize.add(ByteBuffer.wrap("Nip it in the bud".getBytes()));
+        bbToTokenize.add(ByteBuffer.wrap("I couldn’t care less".getBytes()));
+        bbToTokenize.add(ByteBuffer.wrap("One and the same".getBytes()));
+        bbToTokenize.add(ByteBuffer.wrap("The squeaky wheel gets the 
grease.".getBytes()));
+        bbToTokenize.add(ByteBuffer.wrap("The pen is mightier than the 
sword.".getBytes()));
+
+        StandardAnalyzer tokenizer = new StandardAnalyzer();
+        tokenizer.init(StandardTokenizerOptions.getDefaultOptions());
+
+        List<ByteBuffer> tokens = new ArrayList<>();
+        for (ByteBuffer bb : bbToTokenize)
+        {
+            tokenizer.reset(bb);
+            while (tokenizer.hasNext())
+                tokens.add(tokenizer.next());
+        }
+        assertEquals(10, tokens.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java 
b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
new file mode 100644
index 0000000..57629b6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.CombinedTerm;
+import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
+import org.apache.cassandra.index.sasi.utils.OnDiskIndexIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.utils.MurmurHash;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class OnDiskIndexTest
+{
+    @Test
+    public void testStringSAConstruction() throws Exception
+    {
+        Map<ByteBuffer, TokenTreeBuilder> data = new HashMap<ByteBuffer, 
TokenTreeBuilder>()
+        {{
+                put(UTF8Type.instance.decompose("scat"), keyBuilder(1L));
+                put(UTF8Type.instance.decompose("mat"),  keyBuilder(2L));
+                put(UTF8Type.instance.decompose("fat"),  keyBuilder(3L));
+                put(UTF8Type.instance.decompose("cat"),  keyBuilder(1L, 4L));
+                put(UTF8Type.instance.decompose("till"), keyBuilder(2L, 6L));
+                put(UTF8Type.instance.decompose("bill"), keyBuilder(5L));
+                put(UTF8Type.instance.decompose("foo"),  keyBuilder(7L));
+                put(UTF8Type.instance.decompose("bar"),  keyBuilder(9L, 10L));
+                put(UTF8Type.instance.decompose("michael"), keyBuilder(11L, 
12L, 1L));
+        }};
+
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
UTF8Type.instance, OnDiskIndexBuilder.Mode.CONTAINS);
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
+            addAll(builder, e.getKey(), e.getValue());
+
+        File index = File.createTempFile("on-disk-sa-string", "db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new 
KeyConverter());
+
+        // first check if we can find exact matches
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
+        {
+            if (UTF8Type.instance.getString(e.getKey()).equals("cat"))
+                continue; // cat is embedded into scat, we'll test it in next 
section
+
+            Assert.assertEquals("Key was: " + 
UTF8Type.instance.compose(e.getKey()), convert(e.getValue()), 
convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey()))));
+        }
+
+        // check that cat returns positions for scat & cat
+        Assert.assertEquals(convert(1, 4), 
convert(onDisk.search(expressionFor("cat"))));
+
+        // random suffix queries
+        Assert.assertEquals(convert(9, 10), 
convert(onDisk.search(expressionFor("ar"))));
+        Assert.assertEquals(convert(1, 2, 3, 4), 
convert(onDisk.search(expressionFor("at"))));
+        Assert.assertEquals(convert(1, 11, 12), 
convert(onDisk.search(expressionFor("mic"))));
+        Assert.assertEquals(convert(1, 11, 12), 
convert(onDisk.search(expressionFor("ae"))));
+        Assert.assertEquals(convert(2, 5, 6), 
convert(onDisk.search(expressionFor("ll"))));
+        Assert.assertEquals(convert(1, 2, 5, 6, 11, 12), 
convert(onDisk.search(expressionFor("l"))));
+        Assert.assertEquals(convert(7), 
convert(onDisk.search(expressionFor("oo"))));
+        Assert.assertEquals(convert(7), 
convert(onDisk.search(expressionFor("o"))));
+        Assert.assertEquals(convert(1, 2, 3, 4, 6), 
convert(onDisk.search(expressionFor("t"))));
+
+        Assert.assertEquals(Collections.<DecoratedKey>emptySet(), 
convert(onDisk.search(expressionFor("hello"))));
+
+        onDisk.close();
+    }
+
+    @Test
+    public void testIntegerSAConstruction() throws Exception
+    {
+        final Map<ByteBuffer, TokenTreeBuilder> data = new HashMap<ByteBuffer, 
TokenTreeBuilder>()
+        {{
+                put(Int32Type.instance.decompose(5),  keyBuilder(1L));
+                put(Int32Type.instance.decompose(7),  keyBuilder(2L));
+                put(Int32Type.instance.decompose(1),  keyBuilder(3L));
+                put(Int32Type.instance.decompose(3),  keyBuilder(1L, 4L));
+                put(Int32Type.instance.decompose(8),  keyBuilder(2L, 6L));
+                put(Int32Type.instance.decompose(10), keyBuilder(5L));
+                put(Int32Type.instance.decompose(6),  keyBuilder(7L));
+                put(Int32Type.instance.decompose(4),  keyBuilder(9L, 10L));
+                put(Int32Type.instance.decompose(0),  keyBuilder(11L, 12L, 
1L));
+        }};
+
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
+            addAll(builder, e.getKey(), e.getValue());
+
+        File index = File.createTempFile("on-disk-sa-int", "db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new 
KeyConverter());
+
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
+        {
+            Assert.assertEquals(convert(e.getValue()), 
convert(onDisk.search(expressionFor(Int32Type.instance, e.getKey()))));
+        }
+
+        List<ByteBuffer> sortedNumbers = new ArrayList<ByteBuffer>()
+        {{
+            addAll(data.keySet().stream().collect(Collectors.toList()));
+        }};
+
+        Collections.sort(sortedNumbers, Int32Type.instance::compare);
+
+        // test full iteration
+        int idx = 0;
+        for (OnDiskIndex.DataTerm term : onDisk)
+        {
+            ByteBuffer number = sortedNumbers.get(idx++);
+            Assert.assertEquals(number, term.getTerm());
+            Assert.assertEquals(convert(data.get(number)), 
convert(term.getTokens()));
+        }
+
+        // test partial iteration (descending)
+        idx = 3; // start from the 3rd element
+        Iterator<OnDiskIndex.DataTerm> partialIter = 
onDisk.iteratorAt(sortedNumbers.get(idx), OnDiskIndex.IteratorOrder.DESC, true);
+        while (partialIter.hasNext())
+        {
+            OnDiskIndex.DataTerm term = partialIter.next();
+            ByteBuffer number = sortedNumbers.get(idx++);
+
+            Assert.assertEquals(number, term.getTerm());
+            Assert.assertEquals(convert(data.get(number)), 
convert(term.getTokens()));
+        }
+
+        idx = 3; // start from the 3rd element exclusive
+        partialIter = onDisk.iteratorAt(sortedNumbers.get(idx++), 
OnDiskIndex.IteratorOrder.DESC, false);
+        while (partialIter.hasNext())
+        {
+            OnDiskIndex.DataTerm term = partialIter.next();
+            ByteBuffer number = sortedNumbers.get(idx++);
+
+            Assert.assertEquals(number, term.getTerm());
+            Assert.assertEquals(convert(data.get(number)), 
convert(term.getTokens()));
+        }
+
+        // test partial iteration (ascending)
+        idx = 6; // start from the 6rd element
+        partialIter = onDisk.iteratorAt(sortedNumbers.get(idx), 
OnDiskIndex.IteratorOrder.ASC, true);
+        while (partialIter.hasNext())
+        {
+            OnDiskIndex.DataTerm term = partialIter.next();
+            ByteBuffer number = sortedNumbers.get(idx--);
+
+            Assert.assertEquals(number, term.getTerm());
+            Assert.assertEquals(convert(data.get(number)), 
convert(term.getTokens()));
+        }
+
+        idx = 6; // start from the 6rd element exclusive
+        partialIter = onDisk.iteratorAt(sortedNumbers.get(idx--), 
OnDiskIndex.IteratorOrder.ASC, false);
+        while (partialIter.hasNext())
+        {
+            OnDiskIndex.DataTerm term = partialIter.next();
+            ByteBuffer number = sortedNumbers.get(idx--);
+
+            Assert.assertEquals(number, term.getTerm());
+            Assert.assertEquals(convert(data.get(number)), 
convert(term.getTokens()));
+        }
+
+        onDisk.close();
+
+        List<ByteBuffer> iterCheckNums = new ArrayList<ByteBuffer>()
+        {{
+            add(Int32Type.instance.decompose(3));
+            add(Int32Type.instance.decompose(9));
+            add(Int32Type.instance.decompose(14));
+            add(Int32Type.instance.decompose(42));
+        }};
+
+        OnDiskIndexBuilder iterTest = new 
OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, 
OnDiskIndexBuilder.Mode.PREFIX);
+        for (int i = 0; i < iterCheckNums.size(); i++)
+            iterTest.add(iterCheckNums.get(i), keyAt((long) i), i);
+
+        File iterIndex = File.createTempFile("sa-iter", ".db");
+        iterIndex.deleteOnExit();
+
+        iterTest.finish(iterIndex);
+
+        onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, new 
KeyConverter());
+
+        ByteBuffer number = Int32Type.instance.decompose(1);
+        Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, false)));
+        Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, true)));
+        Assert.assertEquals(4, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, false)));
+        Assert.assertEquals(4, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, true)));
+
+        number = Int32Type.instance.decompose(44);
+        Assert.assertEquals(4, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, false)));
+        Assert.assertEquals(4, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, true)));
+        Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, false)));
+        Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, true)));
+
+        number = Int32Type.instance.decompose(20);
+        Assert.assertEquals(3, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, false)));
+        Assert.assertEquals(3, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, true)));
+        Assert.assertEquals(1, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, false)));
+        Assert.assertEquals(1, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, true)));
+
+        number = Int32Type.instance.decompose(5);
+        Assert.assertEquals(1, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, false)));
+        Assert.assertEquals(1, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, true)));
+        Assert.assertEquals(3, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, false)));
+        Assert.assertEquals(3, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, true)));
+
+        number = Int32Type.instance.decompose(10);
+        Assert.assertEquals(2, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, false)));
+        Assert.assertEquals(2, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.ASC, true)));
+        Assert.assertEquals(2, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, false)));
+        Assert.assertEquals(2, Iterators.size(onDisk.iteratorAt(number, 
OnDiskIndex.IteratorOrder.DESC, true)));
+
+        onDisk.close();
+    }
+
+    @Test
+    public void testMultiSuffixMatches() throws Exception
+    {
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
UTF8Type.instance, OnDiskIndexBuilder.Mode.CONTAINS)
+        {{
+                addAll(this, UTF8Type.instance.decompose("Eliza"), 
keyBuilder(1L, 2L));
+                addAll(this, UTF8Type.instance.decompose("Elizabeth"), 
keyBuilder(3L, 4L));
+                addAll(this, UTF8Type.instance.decompose("Aliza"), 
keyBuilder(5L, 6L));
+                addAll(this, UTF8Type.instance.decompose("Taylor"), 
keyBuilder(7L, 8L));
+                addAll(this, UTF8Type.instance.decompose("Pavel"), 
keyBuilder(9L, 10L));
+        }};
+
+        File index = File.createTempFile("on-disk-sa-multi-suffix-match", 
".db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new 
KeyConverter());
+
+        Assert.assertEquals(convert(1, 2, 3, 4, 5, 6), 
convert(onDisk.search(expressionFor("liz"))));
+        Assert.assertEquals(convert(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 
convert(onDisk.search(expressionFor("a"))));
+        Assert.assertEquals(convert(5, 6), 
convert(onDisk.search(expressionFor("A"))));
+        Assert.assertEquals(convert(1, 2, 3, 4), 
convert(onDisk.search(expressionFor("E"))));
+        Assert.assertEquals(convert(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 
convert(onDisk.search(expressionFor("l"))));
+        Assert.assertEquals(convert(3, 4), 
convert(onDisk.search(expressionFor("bet"))));
+        Assert.assertEquals(convert(3, 4, 9, 10), 
convert(onDisk.search(expressionFor("e"))));
+        Assert.assertEquals(convert(7, 8), 
convert(onDisk.search(expressionFor("yl"))));
+        Assert.assertEquals(convert(7, 8), 
convert(onDisk.search(expressionFor("T"))));
+        Assert.assertEquals(convert(1, 2, 3, 4, 5, 6), 
convert(onDisk.search(expressionFor("za"))));
+        Assert.assertEquals(convert(3, 4), 
convert(onDisk.search(expressionFor("ab"))));
+
+        Assert.assertEquals(Collections.<DecoratedKey>emptySet(), 
convert(onDisk.search(expressionFor("Pi"))));
+        Assert.assertEquals(Collections.<DecoratedKey>emptySet(), 
convert(onDisk.search(expressionFor("ethz"))));
+        Assert.assertEquals(Collections.<DecoratedKey>emptySet(), 
convert(onDisk.search(expressionFor("liw"))));
+        Assert.assertEquals(Collections.<DecoratedKey>emptySet(), 
convert(onDisk.search(expressionFor("Taw"))));
+        Assert.assertEquals(Collections.<DecoratedKey>emptySet(), 
convert(onDisk.search(expressionFor("Av"))));
+
+        onDisk.close();
+    }
+
+    @Test
+    public void testSparseMode() throws Exception
+    {
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
+
+        final long start = System.currentTimeMillis();
+        final int numIterations = 100000;
+
+        for (long i = 0; i < numIterations; i++)
+            builder.add(LongType.instance.decompose(start + i), keyAt(i), i);
+
+        File index = File.createTempFile("on-disk-sa-sparse", "db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new 
KeyConverter());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        for (long step = start; step < (start + numIterations); step += 1000)
+        {
+            boolean lowerInclusive = random.nextBoolean();
+            boolean upperInclusive = random.nextBoolean();
+
+            long limit = random.nextLong(step, start + numIterations);
+            RangeIterator<Long, Token> rows = 
onDisk.search(expressionFor(step, lowerInclusive, limit, upperInclusive));
+
+            long lowerKey = step - start;
+            long upperKey = lowerKey + (limit - step);
+
+            if (!lowerInclusive)
+                lowerKey += 1;
+
+            if (upperInclusive)
+                upperKey += 1;
+
+            Set<DecoratedKey> actual = convert(rows);
+            for (long key = lowerKey; key < upperKey; key++)
+                Assert.assertTrue("key" + key + " wasn't found", 
actual.contains(keyAt(key)));
+
+            Assert.assertEquals((upperKey - lowerKey), actual.size());
+        }
+
+        // let's also explicitly test whole range search
+        RangeIterator<Long, Token> rows = onDisk.search(expressionFor(start, 
true, start + numIterations, true));
+
+        Set<DecoratedKey> actual = convert(rows);
+        Assert.assertEquals(numIterations, actual.size());
+    }
+
+    @Test
+    public void testNotEqualsQueryForStrings() throws Exception
+    {
+        Map<ByteBuffer, TokenTreeBuilder> data = new HashMap<ByteBuffer, 
TokenTreeBuilder>()
+        {{
+                put(UTF8Type.instance.decompose("Pavel"),   keyBuilder(1L, 
2L));
+                put(UTF8Type.instance.decompose("Jason"),   keyBuilder(3L));
+                put(UTF8Type.instance.decompose("Jordan"),  keyBuilder(4L));
+                put(UTF8Type.instance.decompose("Michael"), keyBuilder(5L, 
6L));
+                put(UTF8Type.instance.decompose("Vijay"),   keyBuilder(7L));
+                put(UTF8Type.instance.decompose("Travis"),  keyBuilder(8L));
+                put(UTF8Type.instance.decompose("Aleksey"), keyBuilder(9L, 
10L));
+        }};
+
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
UTF8Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
+            addAll(builder, e.getKey(), e.getValue());
+
+        File index = File.createTempFile("on-disk-sa-except-test", "db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new 
KeyConverter());
+
+        // test whole words first
+        Assert.assertEquals(convert(3, 4, 5, 6, 7, 8, 9, 10), 
convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel"))));
+
+        Assert.assertEquals(convert(3, 4, 7, 8, 9, 10), 
convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel", 
"Michael"))));
+
+        Assert.assertEquals(convert(3, 4, 7, 9, 10), 
convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel", "Michael", 
"Travis"))));
+
+        // now test prefixes
+        Assert.assertEquals(convert(3, 4, 5, 6, 7, 8, 9, 10), 
convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pav"))));
+
+        Assert.assertEquals(convert(3, 4, 7, 8, 9, 10), 
convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel", "Mic"))));
+
+        Assert.assertEquals(convert(3, 4, 7, 9, 10), 
convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel", "Micha", 
"Tr"))));
+
+        onDisk.close();
+    }
+
+    @Test
+    public void testNotEqualsQueryForNumbers() throws Exception
+    {
+        final Map<ByteBuffer, TokenTreeBuilder> data = new HashMap<ByteBuffer, 
TokenTreeBuilder>()
+        {{
+                put(Int32Type.instance.decompose(5),  keyBuilder(1L));
+                put(Int32Type.instance.decompose(7),  keyBuilder(2L));
+                put(Int32Type.instance.decompose(1),  keyBuilder(3L));
+                put(Int32Type.instance.decompose(3),  keyBuilder(1L, 4L));
+                put(Int32Type.instance.decompose(8),  keyBuilder(8L, 6L));
+                put(Int32Type.instance.decompose(10), keyBuilder(5L));
+                put(Int32Type.instance.decompose(6),  keyBuilder(7L));
+                put(Int32Type.instance.decompose(4),  keyBuilder(9L, 10L));
+                put(Int32Type.instance.decompose(0),  keyBuilder(11L, 12L, 
1L));
+        }};
+
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
+            addAll(builder, e.getKey(), e.getValue());
+
+        File index = File.createTempFile("on-disk-sa-except-int-test", "db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new 
KeyConverter());
+
+        Assert.assertEquals(convert(1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12), 
convert(onDisk.search(expressionForNot(0, 10, 1))));
+        Assert.assertEquals(convert(1, 2, 4, 5, 7, 9, 10, 11, 12), 
convert(onDisk.search(expressionForNot(0, 10, 1, 8))));
+        Assert.assertEquals(convert(1, 2, 4, 5, 7, 11, 12), 
convert(onDisk.search(expressionForNot(0, 10, 1, 8, 4))));
+
+        onDisk.close();
+    }
+
+    @Test
+    public void testRangeQueryWithExclusions() throws Exception
+    {
+        final long lower = 0;
+        final long upper = 100000;
+
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
+        for (long i = lower; i <= upper; i++)
+            builder.add(LongType.instance.decompose(i), keyAt(i), i);
+
+        File index = File.createTempFile("on-disk-sa-except-long-ranges", 
"db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new 
KeyConverter());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        // single exclusion
+
+        // let's do small range first to figure out if searchPoint works 
properly
+        validateExclusions(onDisk, lower, 50, Sets.newHashSet(42L));
+        // now let's do whole data set to test SPARSE searching
+        validateExclusions(onDisk, lower, upper, Sets.newHashSet(31337L));
+
+        // pair of exclusions which would generate a split
+
+        validateExclusions(onDisk, lower, random.nextInt(400, 800), 
Sets.newHashSet(42L, 154L));
+        validateExclusions(onDisk, lower, upper, Sets.newHashSet(31337L, 
54631L));
+
+        // 3 exclusions which would generate a split and change bounds
+
+        validateExclusions(onDisk, lower, random.nextInt(400, 800), 
Sets.newHashSet(42L, 154L));
+        validateExclusions(onDisk, lower, upper, Sets.newHashSet(31337L, 
54631L));
+
+        validateExclusions(onDisk, lower, random.nextLong(400, upper), 
Sets.newHashSet(42L, 55L));
+        validateExclusions(onDisk, lower, random.nextLong(400, upper), 
Sets.newHashSet(42L, 55L, 93L));
+        validateExclusions(onDisk, lower, random.nextLong(400, upper), 
Sets.newHashSet(42L, 55L, 93L, 205L));
+
+        Set<Long> exclusions = Sets.newHashSet(3L, 12L, 13L, 14L, 27L, 54L, 
81L, 125L, 384L, 771L, 1054L, 2048L, 78834L);
+
+        // test that exclusions are properly bound by lower/upper of the 
expression
+        Assert.assertEquals(392, validateExclusions(onDisk, lower, 400, 
exclusions, false));
+        Assert.assertEquals(101, validateExclusions(onDisk, lower, 100, 
Sets.newHashSet(-10L, -5L, -1L), false));
+
+        validateExclusions(onDisk, lower, upper, exclusions);
+
+        Assert.assertEquals(100000, convert(onDisk.search(new Expression("", 
LongType.instance)
+                                                    .add(Operator.NEQ, 
LongType.instance.decompose(100L)))).size());
+
+        Assert.assertEquals(49, convert(onDisk.search(new Expression("", 
LongType.instance)
+                                                    .add(Operator.LT, 
LongType.instance.decompose(50L))
+                                                    .add(Operator.NEQ, 
LongType.instance.decompose(10L)))).size());
+
+        Assert.assertEquals(99998, convert(onDisk.search(new Expression("", 
LongType.instance)
+                                                    .add(Operator.GT, 
LongType.instance.decompose(1L))
+                                                    .add(Operator.NEQ, 
LongType.instance.decompose(20L)))).size());
+
+        onDisk.close();
+    }
+
+    private void validateExclusions(OnDiskIndex sa, long lower, long upper, 
Set<Long> exclusions)
+    {
+        validateExclusions(sa, lower, upper, exclusions, true);
+    }
+
+    private int validateExclusions(OnDiskIndex sa, long lower, long upper, 
Set<Long> exclusions, boolean checkCount)
+    {
+        int count = 0;
+        for (DecoratedKey key : convert(sa.search(rangeWithExclusions(lower, 
true, upper, true, exclusions))))
+        {
+            String keyId = 
UTF8Type.instance.getString(key.getKey()).split("key")[1];
+            Assert.assertFalse("key" + keyId + " is present.", 
exclusions.contains(Long.valueOf(keyId)));
+            count++;
+        }
+
+        if (checkCount)
+            Assert.assertEquals(upper - (lower == 0 ? -1 : lower) - 
exclusions.size(), count);
+
+        return count;
+    }
+
+    @Test
+    public void testDescriptor() throws Exception
+    {
+        final Map<ByteBuffer, Pair<DecoratedKey, Long>> data = new 
HashMap<ByteBuffer, Pair<DecoratedKey, Long>>()
+        {{
+                put(Int32Type.instance.decompose(5), Pair.create(keyAt(1L), 
1L));
+        }};
+
+        OnDiskIndexBuilder builder1 = new 
OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, 
OnDiskIndexBuilder.Mode.PREFIX);
+        OnDiskIndexBuilder builder2 = new 
OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, 
OnDiskIndexBuilder.Mode.PREFIX);
+        for (Map.Entry<ByteBuffer, Pair<DecoratedKey, Long>> e : 
data.entrySet())
+        {
+            DecoratedKey key = e.getValue().left;
+            Long position = e.getValue().right;
+
+            builder1.add(e.getKey(), key, position);
+            builder2.add(e.getKey(), key, position);
+        }
+
+        File index1 = File.createTempFile("on-disk-sa-int", "db");
+        File index2 = File.createTempFile("on-disk-sa-int2", "db");
+        index1.deleteOnExit();
+        index2.deleteOnExit();
+
+        builder1.finish(index1);
+        builder2.finish(new Descriptor(Descriptor.VERSION_AA), index2);
+
+        OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, new 
KeyConverter());
+        OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, new 
KeyConverter());
+
+        ByteBuffer number = Int32Type.instance.decompose(5);
+
+        Assert.assertEquals(Collections.singleton(data.get(number).left), 
convert(onDisk1.search(expressionFor(Int32Type.instance, number))));
+        Assert.assertEquals(Collections.singleton(data.get(number).left), 
convert(onDisk2.search(expressionFor(Int32Type.instance, number))));
+
+        Assert.assertEquals(onDisk1.descriptor.version.version, 
Descriptor.CURRENT_VERSION);
+        Assert.assertEquals(onDisk2.descriptor.version.version, 
Descriptor.VERSION_AA);
+    }
+
+    @Test
+    public void testSuperBlocks() throws Exception
+    {
+        Map<ByteBuffer, TokenTreeBuilder> terms = new HashMap<>();
+        terms.put(UTF8Type.instance.decompose("1234"), keyBuilder(1L, 2L));
+        terms.put(UTF8Type.instance.decompose("2345"), keyBuilder(3L, 4L));
+        terms.put(UTF8Type.instance.decompose("3456"), keyBuilder(5L, 6L));
+        terms.put(UTF8Type.instance.decompose("4567"), keyBuilder(7L, 8L));
+        terms.put(UTF8Type.instance.decompose("5678"), keyBuilder(9L, 10L));
+
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
Int32Type.instance, OnDiskIndexBuilder.Mode.SPARSE);
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> entry : terms.entrySet())
+            addAll(builder, entry.getKey(), entry.getValue());
+
+        File index = File.createTempFile("on-disk-sa-try-superblocks", ".db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new 
KeyConverter());
+        OnDiskIndex.OnDiskSuperBlock superBlock = 
onDisk.dataLevel.getSuperBlock(0);
+        Iterator<Token> iter = superBlock.iterator();
+
+        Long lastToken = null;
+        while (iter.hasNext())
+        {
+            Token token = iter.next();
+
+            if (lastToken != null)
+                Assert.assertTrue(lastToken.compareTo(token.get()) < 0);
+
+            lastToken = token.get();
+        }
+    }
+
+    @Test
+    public void testSuperBlockRetrieval() throws Exception
+    {
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, 
LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
+        for (long i = 0; i < 100000; i++)
+            builder.add(LongType.instance.decompose(i), keyAt(i), i);
+
+        File index = File.createTempFile("on-disk-sa-multi-superblock-match", 
".db");
+        index.deleteOnExit();
+
+        builder.finish(index);
+
+        OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, 
new KeyConverter());
+
+        testSearchRangeWithSuperBlocks(onDiskIndex, 0, 500);
+        testSearchRangeWithSuperBlocks(onDiskIndex, 300, 93456);
+        testSearchRangeWithSuperBlocks(onDiskIndex, 210, 1700);
+        testSearchRangeWithSuperBlocks(onDiskIndex, 530, 3200);
+
+        Random random = new Random(0xdeadbeef);
+        for (int i = 0; i < 100000; i += random.nextInt(1500)) // random steps 
with max of 1500 elements
+        {
+            for (int j = 0; j < 3; j++)
+                testSearchRangeWithSuperBlocks(onDiskIndex, i, 
ThreadLocalRandom.current().nextInt(i, 100000));
+        }
+    }
+
+    @Test
+    public void testCombiningOfThePartitionedSA() throws Exception
+    {
+        OnDiskIndexBuilder builderA = new 
OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, 
OnDiskIndexBuilder.Mode.PREFIX);
+        OnDiskIndexBuilder builderB = new 
OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, 
OnDiskIndexBuilder.Mode.PREFIX);
+
+        TreeMap<Long, TreeMap<Long, LongSet>> expected = new TreeMap<>();
+
+        for (long i = 0; i <= 100; i++)
+        {
+            TreeMap<Long, LongSet> offsets = expected.get(i);
+            if (offsets == null)
+                expected.put(i, (offsets = new TreeMap<>()));
+
+            builderA.add(LongType.instance.decompose(i), keyAt(i), i);
+            offsets.putAll(keyBuilder(i).getTokens());
+        }
+
+        for (long i = 50; i < 100; i++)
+        {
+            TreeMap<Long, LongSet> offsets = expected.get(i);
+            if (offsets == null)
+                expected.put(i, (offsets = new TreeMap<>()));
+
+            long position = 100L + i;
+            builderB.add(LongType.instance.decompose(i), keyAt(position), 
position);
+            offsets.putAll(keyBuilder(100L + i).getTokens());
+        }
+
+        File indexA = File.createTempFile("on-disk-sa-partition-a", ".db");
+        indexA.deleteOnExit();
+
+        File indexB = File.createTempFile("on-disk-sa-partition-b", ".db");
+        indexB.deleteOnExit();
+
+        builderA.finish(indexA);
+        builderB.finish(indexB);
+
+        OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, new 
KeyConverter());
+        OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, new 
KeyConverter());
+
+        RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union = 
OnDiskIndexIterator.union(a, b);
+
+        TreeMap<Long, TreeMap<Long, LongSet>> actual = new TreeMap<>();
+        while (union.hasNext())
+        {
+            CombinedTerm term = union.next();
+
+            Long composedTerm = LongType.instance.compose(term.getTerm());
+
+            TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
+            if (offsets == null)
+                actual.put(composedTerm, (offsets = new TreeMap<>()));
+
+            offsets.putAll(term.getTokens());
+        }
+
+        Assert.assertEquals(actual, expected);
+
+        File indexC = File.createTempFile("on-disk-sa-partition-final", ".db");
+        indexC.deleteOnExit();
+
+        OnDiskIndexBuilder combined = new 
OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, 
OnDiskIndexBuilder.Mode.PREFIX);
+        combined.finish(Pair.create(keyAt(0).getKey(), keyAt(100).getKey()), 
indexC, new CombinedTermIterator(a, b));
+
+        OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, new 
KeyConverter());
+        union = OnDiskIndexIterator.union(c);
+        actual.clear();
+
+        while (union.hasNext())
+        {
+            CombinedTerm term = union.next();
+
+            Long composedTerm = LongType.instance.compose(term.getTerm());
+
+            TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
+            if (offsets == null)
+                actual.put(composedTerm, (offsets = new TreeMap<>()));
+
+            offsets.putAll(term.getTokens());
+        }
+
+        Assert.assertEquals(actual, expected);
+
+        a.close();
+        b.close();
+    }
+
+    private void testSearchRangeWithSuperBlocks(OnDiskIndex onDiskIndex, long 
start, long end)
+    {
+        RangeIterator<Long, Token> tokens = 
onDiskIndex.search(expressionFor(start, true, end, false));
+
+        int keyCount = 0;
+        Long lastToken = null;
+        while (tokens.hasNext())
+        {
+            Token token = tokens.next();
+            Iterator<DecoratedKey> keys = token.iterator();
+
+            // each of the values should have exactly a single key
+            Assert.assertTrue(keys.hasNext());
+            keys.next();
+            Assert.assertFalse(keys.hasNext());
+
+            // and it's last should always smaller than current
+            if (lastToken != null)
+                Assert.assertTrue("last should be less than current", 
lastToken.compareTo(token.get()) < 0);
+
+            lastToken = token.get();
+            keyCount++;
+        }
+
+        Assert.assertEquals(end - start, keyCount);
+    }
+
+    private static DecoratedKey keyAt(long rawKey)
+    {
+        ByteBuffer key = ByteBuffer.wrap(("key" + rawKey).getBytes());
+        return new BufferDecoratedKey(new 
Murmur3Partitioner.LongToken(MurmurHash.hash2_64(key, key.position(), 
key.remaining(), 0)), key);
+    }
+
+    private static TokenTreeBuilder keyBuilder(Long... keys)
+    {
+        TokenTreeBuilder builder = new TokenTreeBuilder();
+
+        for (final Long key : keys)
+        {
+            DecoratedKey dk = keyAt(key);
+            builder.add((Long) dk.getToken().getTokenValue(), key);
+        }
+
+        return builder.finish();
+    }
+
+    private static Set<DecoratedKey> convert(TokenTreeBuilder offsets)
+    {
+        Set<DecoratedKey> result = new HashSet<>();
+
+        Iterator<Pair<Long, LongSet>> offsetIter = offsets.iterator();
+        while (offsetIter.hasNext())
+        {
+            LongSet v = offsetIter.next().right;
+
+            for (LongCursor offset : v)
+                result.add(keyAt(offset.value));
+        }
+        return result;
+    }
+
+    private static Set<DecoratedKey> convert(long... keyOffsets)
+    {
+        Set<DecoratedKey> result = new HashSet<>();
+        for (long offset : keyOffsets)
+            result.add(keyAt(offset));
+
+        return result;
+    }
+
+    private static Set<DecoratedKey> convert(RangeIterator<Long, Token> 
results)
+    {
+        if (results == null)
+            return Collections.emptySet();
+
+        Set<DecoratedKey> keys = new TreeSet<>(DecoratedKey.comparator);
+
+        while (results.hasNext())
+        {
+            for (DecoratedKey key : results.next())
+                keys.add(key);
+        }
+
+        return keys;
+    }
+
+    private static Expression expressionFor(long lower, boolean 
lowerInclusive, long upper, boolean upperInclusive)
+    {
+        Expression expression = new Expression("", LongType.instance);
+        expression.add(lowerInclusive ? Operator.GTE : Operator.GT, 
LongType.instance.decompose(lower));
+        expression.add(upperInclusive ? Operator.LTE : Operator.LT, 
LongType.instance.decompose(upper));
+        return expression;
+    }
+
+    private static Expression expressionFor(AbstractType<?> validator, 
ByteBuffer term)
+    {
+        Expression expression = new Expression("", validator);
+        expression.add(Operator.EQ, term);
+        return expression;
+    }
+
+    private static Expression expressionForNot(AbstractType<?> validator, 
ByteBuffer lower, ByteBuffer upper, Iterable<ByteBuffer> terms)
+    {
+        Expression expression = new Expression("", validator);
+        expression.setOp(Expression.Op.RANGE);
+        expression.setLower(new Expression.Bound(lower, true));
+        expression.setUpper(new Expression.Bound(upper, true));
+        for (ByteBuffer term : terms)
+            expression.add(Operator.NEQ, term);
+        return expression;
+
+    }
+
+    private static Expression expressionForNot(Integer lower, Integer upper, 
Integer... terms)
+    {
+        return expressionForNot(Int32Type.instance,
+                Int32Type.instance.decompose(lower),
+                Int32Type.instance.decompose(upper),
+                
Arrays.asList(terms).stream().map(Int32Type.instance::decompose).collect(Collectors.toList()));
+    }
+
+    private static Expression rangeWithExclusions(long lower, boolean 
lowerInclusive, long upper, boolean upperInclusive, Set<Long> exclusions)
+    {
+        Expression expression = expressionFor(lower, lowerInclusive, upper, 
upperInclusive);
+        for (long e : exclusions)
+            expression.add(Operator.NEQ, LongType.instance.decompose(e));
+
+        return expression;
+    }
+
+    private static Expression expressionForNot(String lower, String upper, 
String... terms)
+    {
+        return expressionForNot(UTF8Type.instance,
+                UTF8Type.instance.decompose(lower),
+                UTF8Type.instance.decompose(upper),
+                
Arrays.asList(terms).stream().map(UTF8Type.instance::decompose).collect(Collectors.toList()));
+    }
+
+    private static Expression expressionFor(String term)
+    {
+        return expressionFor(UTF8Type.instance, 
UTF8Type.instance.decompose(term));
+    }
+
+    private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, 
TokenTreeBuilder tokens)
+    {
+        for (Map.Entry<Long, LongSet> token : tokens.getTokens().entrySet())
+        {
+            for (long position : token.getValue().toArray())
+                builder.add(term, keyAt(position), position);
+        }
+    }
+
+    private static class KeyConverter implements Function<Long, DecoratedKey>
+    {
+        @Override
+        public DecoratedKey apply(Long offset)
+        {
+            return keyAt(offset);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java 
b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
new file mode 100644
index 0000000..4663692
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.sasi.SASIIndex;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import com.google.common.util.concurrent.Futures;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class PerSSTableIndexWriterTest extends SchemaLoader
+{
+    private static final String KS_NAME = "sasi";
+    private static final String CF_NAME = "test_cf";
+
+    @BeforeClass
+    public static void loadSchema() throws ConfigurationException
+    {
+        System.setProperty("cassandra.config", "cassandra-murmur.yaml");
+        SchemaLoader.loadSchema();
+        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(KS_NAME,
+                                                                     
KeyspaceParams.simpleTransient(1),
+                                                                     
Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME))));
+    }
+
+    @Test
+    public void testPartialIndexWrites() throws Exception
+    {
+        final int maxKeys = 100000, numParts = 4, partSize = maxKeys / 
numParts;
+        final String keyFormat = "key%06d";
+        final long timestamp = System.currentTimeMillis();
+
+        ColumnFamilyStore cfs = 
Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
+        ColumnDefinition column = 
cfs.metadata.getColumnDefinition(UTF8Type.instance.decompose("age"));
+
+        SASIIndex sasi = (SASIIndex) cfs.indexManager.getIndexByName("age");
+
+        File directory = cfs.getDirectories().getDirectoryForNewSSTables();
+        Descriptor descriptor = 
Descriptor.fromFilename(cfs.getSSTablePath(directory));
+        PerSSTableIndexWriter indexWriter = (PerSSTableIndexWriter) 
sasi.getFlushObserver(descriptor, OperationType.FLUSH);
+
+        SortedMap<DecoratedKey, Row> expectedKeys = new 
TreeMap<>(DecoratedKey.comparator);
+
+        for (int i = 0; i < maxKeys; i++)
+        {
+            ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, i));
+            expectedKeys.put(cfs.metadata.partitioner.decorateKey(key),
+                             BTreeRow.singleCellRow(Clustering.EMPTY,
+                                                    
BufferCell.live(cfs.metadata, column, timestamp, 
Int32Type.instance.decompose(i))));
+        }
+
+        indexWriter.begin();
+
+        Iterator<Map.Entry<DecoratedKey, Row>> keyIterator = 
expectedKeys.entrySet().iterator();
+        long position = 0;
+
+        Set<String> segments = new HashSet<>();
+        outer:
+        for (;;)
+        {
+            for (int i = 0; i < partSize; i++)
+            {
+                if (!keyIterator.hasNext())
+                    break outer;
+
+                Map.Entry<DecoratedKey, Row> key = keyIterator.next();
+
+                indexWriter.startPartition(key.getKey(), position++);
+                indexWriter.nextUnfilteredCluster(key.getValue());
+            }
+
+            PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
+
+            OnDiskIndex segment = index.scheduleSegmentFlush(false).call();
+            index.segments.add(Futures.immediateFuture(segment));
+            segments.add(segment.getIndexPath());
+        }
+
+        for (String segment : segments)
+            Assert.assertTrue(new File(segment).exists());
+
+        String indexFile = indexWriter.indexes.get(column).filename(true);
+
+        // final flush
+        indexWriter.complete();
+
+        for (String segment : segments)
+            Assert.assertFalse(new File(segment).exists());
+
+        OnDiskIndex index = new OnDiskIndex(new File(indexFile), 
Int32Type.instance, keyPosition -> {
+            ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, 
keyPosition));
+            return cfs.metadata.partitioner.decorateKey(key);
+        });
+
+        Assert.assertEquals(0, UTF8Type.instance.compare(index.minKey(), 
ByteBufferUtil.bytes(String.format(keyFormat, 0))));
+        Assert.assertEquals(0, UTF8Type.instance.compare(index.maxKey(), 
ByteBufferUtil.bytes(String.format(keyFormat, maxKeys - 1))));
+
+        Set<DecoratedKey> actualKeys = new HashSet<>();
+        int count = 0;
+        for (OnDiskIndex.DataTerm term : index)
+        {
+            RangeIterator<Long, Token> tokens = term.getTokens();
+
+            while (tokens.hasNext())
+            {
+                for (DecoratedKey key : tokens.next())
+                    actualKeys.add(key);
+            }
+
+            Assert.assertEquals(count++, (int) 
Int32Type.instance.compose(term.getTerm()));
+        }
+
+        Assert.assertEquals(expectedKeys.size(), actualKeys.size());
+        for (DecoratedKey key : expectedKeys.keySet())
+            Assert.assertTrue(actualKeys.contains(key));
+
+        FileUtils.closeQuietly(index);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java 
b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
new file mode 100644
index 0000000..d09b8d4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.MurmurHash;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+import com.google.common.base.Function;
+
+public class TokenTreeTest
+{
+    private static final Function<Long, DecoratedKey> KEY_CONVERTER = new 
KeyConverter();
+
+    static LongSet singleOffset = new LongOpenHashSet() {{ add(1); }};
+    static LongSet bigSingleOffset = new LongOpenHashSet() {{ add(((long) 
Integer.MAX_VALUE) + 10); }};
+    static LongSet shortPackableCollision = new LongOpenHashSet() {{ add(2L); 
add(3L); }}; // can pack two shorts
+    static LongSet intPackableCollision = new LongOpenHashSet() {{ add(6L); 
add(((long) Short.MAX_VALUE) + 1); }}; // can pack int & short
+    static LongSet multiCollision =  new LongOpenHashSet() {{ add(3L); 
add(4L); add(5L); }}; // can't pack
+    static LongSet unpackableCollision = new LongOpenHashSet() {{ add(((long) 
Short.MAX_VALUE) + 1); add(((long) Short.MAX_VALUE) + 2); }}; // can't pack
+
+    final static SortedMap<Long, LongSet> simpleTokenMap = new TreeMap<Long, 
LongSet>()
+    {{
+            put(1L, bigSingleOffset); put(3L, shortPackableCollision); put(4L, 
intPackableCollision); put(6L, singleOffset);
+            put(9L, multiCollision); put(10L, unpackableCollision); put(12L, 
singleOffset); put(13L, singleOffset);
+            put(15L, singleOffset); put(16L, singleOffset); put(20L, 
singleOffset); put(22L, singleOffset);
+            put(25L, singleOffset); put(26L, singleOffset); put(27L, 
singleOffset); put(28L, singleOffset);
+            put(40L, singleOffset); put(50L, singleOffset); put(100L, 
singleOffset); put(101L, singleOffset);
+            put(102L, singleOffset); put(103L, singleOffset); put(108L, 
singleOffset); put(110L, singleOffset);
+            put(112L, singleOffset); put(115L, singleOffset); put(116L, 
singleOffset); put(120L, singleOffset);
+            put(121L, singleOffset); put(122L, singleOffset); put(123L, 
singleOffset); put(125L, singleOffset);
+    }};
+
+    final static SortedMap<Long, LongSet> bigTokensMap = new TreeMap<Long, 
LongSet>()
+    {{
+            for (long i = 0; i < 1000000; i++)
+                put(i, singleOffset);
+    }};
+
+    final static SortedMap<Long, LongSet> collidingTokensMap = new 
TreeMap<Long, LongSet>()
+    {{
+            put(1L, singleOffset); put(7L, singleOffset); put(8L, 
singleOffset);
+    }};
+
+    final static SortedMap<Long, LongSet> tokens = bigTokensMap;
+
+    @Test
+    public void buildAndIterate() throws Exception
+    {
+        final TokenTreeBuilder builder = new TokenTreeBuilder(tokens).finish();
+        final Iterator<Pair<Long, LongSet>> tokenIterator = builder.iterator();
+        final Iterator<Map.Entry<Long, LongSet>> listIterator = 
tokens.entrySet().iterator();
+        while (tokenIterator.hasNext() && listIterator.hasNext())
+        {
+            Pair<Long, LongSet> tokenNext = tokenIterator.next();
+            Map.Entry<Long, LongSet> listNext = listIterator.next();
+
+            Assert.assertEquals(listNext.getKey(), tokenNext.left);
+            Assert.assertEquals(listNext.getValue(), tokenNext.right);
+        }
+
+        Assert.assertFalse("token iterator not finished", 
tokenIterator.hasNext());
+        Assert.assertFalse("list iterator not finished", 
listIterator.hasNext());
+    }
+
+    @Test
+    public void buildWithMultipleMapsAndIterate() throws Exception
+    {
+        final SortedMap<Long, LongSet> merged = new TreeMap<>();
+        final TokenTreeBuilder builder = new 
TokenTreeBuilder(simpleTokenMap).finish();
+        builder.add(collidingTokensMap);
+
+        merged.putAll(collidingTokensMap);
+        for (Map.Entry<Long, LongSet> entry : simpleTokenMap.entrySet())
+        {
+            if (merged.containsKey(entry.getKey()))
+            {
+                LongSet mergingOffsets  = entry.getValue();
+                LongSet existingOffsets = merged.get(entry.getKey());
+
+                if (mergingOffsets.equals(existingOffsets))
+                    continue;
+
+                Set<Long> mergeSet = new HashSet<>();
+                for (LongCursor merging : mergingOffsets)
+                    mergeSet.add(merging.value);
+
+                for (LongCursor existing : existingOffsets)
+                    mergeSet.add(existing.value);
+
+                LongSet mergedResults = new LongOpenHashSet();
+                for (Long result : mergeSet)
+                    mergedResults.add(result);
+
+                merged.put(entry.getKey(), mergedResults);
+            }
+            else
+            {
+                merged.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        final Iterator<Pair<Long, LongSet>> tokenIterator = builder.iterator();
+        final Iterator<Map.Entry<Long, LongSet>> listIterator = 
merged.entrySet().iterator();
+        while (tokenIterator.hasNext() && listIterator.hasNext())
+        {
+            Pair<Long, LongSet> tokenNext = tokenIterator.next();
+            Map.Entry<Long, LongSet> listNext = listIterator.next();
+
+            Assert.assertEquals(listNext.getKey(), tokenNext.left);
+            Assert.assertEquals(listNext.getValue(), tokenNext.right);
+        }
+
+        Assert.assertFalse("token iterator not finished", 
tokenIterator.hasNext());
+        Assert.assertFalse("list iterator not finished", 
listIterator.hasNext());
+
+    }
+
+    @Test
+    public void testSerializedSize() throws Exception
+    {
+        final TokenTreeBuilder builder = new TokenTreeBuilder(tokens).finish();
+
+        final File treeFile = File.createTempFile("token-tree-size-test", 
"tt");
+        treeFile.deleteOnExit();
+
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        {
+            builder.write(writer);
+            writer.sync();
+        }
+
+        final RandomAccessReader reader = RandomAccessReader.open(treeFile);
+        Assert.assertEquals((int) reader.bytesRemaining(), 
builder.serializedSize());
+    }
+
+    @Test
+    public void buildSerializeAndIterate() throws Exception
+    {
+        final TokenTreeBuilder builder = new 
TokenTreeBuilder(simpleTokenMap).finish();
+
+        final File treeFile = File.createTempFile("token-tree-iterate-test1", 
"tt");
+        treeFile.deleteOnExit();
+
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        {
+            builder.write(writer);
+            writer.sync();
+        }
+
+        final RandomAccessReader reader = RandomAccessReader.open(treeFile);
+        final TokenTree tokenTree = new TokenTree(new MappedBuffer(reader));
+
+        final Iterator<Token> tokenIterator = 
tokenTree.iterator(KEY_CONVERTER);
+        final Iterator<Map.Entry<Long, LongSet>> listIterator = 
simpleTokenMap.entrySet().iterator();
+        while (tokenIterator.hasNext() && listIterator.hasNext())
+        {
+            Token treeNext = tokenIterator.next();
+            Map.Entry<Long, LongSet> listNext = listIterator.next();
+
+            Assert.assertEquals(listNext.getKey(), treeNext.get());
+            Assert.assertEquals(convert(listNext.getValue()), 
convert(treeNext));
+        }
+
+        Assert.assertFalse("token iterator not finished", 
tokenIterator.hasNext());
+        Assert.assertFalse("list iterator not finished", 
listIterator.hasNext());
+
+        reader.close();
+    }
+
+    @Test
+    public void buildSerializeAndGet() throws Exception
+    {
+        final long tokMin = 0;
+        final long tokMax = 1000;
+
+        final TokenTree tokenTree = generateTree(tokMin, tokMax);
+
+        for (long i = 0; i <= tokMax; i++)
+        {
+            TokenTree.OnDiskToken result = tokenTree.get(i, KEY_CONVERTER);
+            Assert.assertNotNull("failed to find object for token " + i, 
result);
+
+            Set<Long> found = result.getOffsets();
+            Assert.assertEquals(1, found.size());
+            Assert.assertEquals(i, found.toArray()[0]);
+        }
+
+        Assert.assertNull("found missing object", tokenTree.get(tokMax + 10, 
KEY_CONVERTER));
+    }
+
+    @Test
+    public void buildSerializeIterateAndSkip() throws Exception
+    {
+        final TokenTreeBuilder builder = new TokenTreeBuilder(tokens).finish();
+
+        final File treeFile = File.createTempFile("token-tree-iterate-test2", 
"tt");
+        treeFile.deleteOnExit();
+
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        {
+            builder.write(writer);
+            writer.sync();
+        }
+
+        final RandomAccessReader reader = RandomAccessReader.open(treeFile);
+        final TokenTree tokenTree = new TokenTree(new MappedBuffer(reader));
+
+        final RangeIterator<Long, Token> treeIterator = 
tokenTree.iterator(KEY_CONVERTER);
+        final RangeIterator<Long, TokenWithOffsets> listIterator = new 
EntrySetSkippableIterator(tokens);
+
+        long lastToken = 0L;
+        while (treeIterator.hasNext() && lastToken < 12)
+        {
+            Token treeNext = treeIterator.next();
+            TokenWithOffsets listNext = listIterator.next();
+
+            Assert.assertEquals(listNext.token, (lastToken = treeNext.get()));
+            Assert.assertEquals(convert(listNext.offsets), convert(treeNext));
+        }
+
+        treeIterator.skipTo(100548L);
+        listIterator.skipTo(100548L);
+
+        while (treeIterator.hasNext() && listIterator.hasNext())
+        {
+            Token treeNext = treeIterator.next();
+            TokenWithOffsets listNext = listIterator.next();
+
+            Assert.assertEquals(listNext.token, (long) treeNext.get());
+            Assert.assertEquals(convert(listNext.offsets), convert(treeNext));
+
+        }
+
+        Assert.assertFalse("Tree iterator not completed", 
treeIterator.hasNext());
+        Assert.assertFalse("List iterator not completed", 
listIterator.hasNext());
+
+        reader.close();
+    }
+
+    @Test
+    public void skipPastEnd() throws Exception
+    {
+        final TokenTreeBuilder builder = new 
TokenTreeBuilder(simpleTokenMap).finish();
+
+        final File treeFile = File.createTempFile("token-tree-skip-past-test", 
"tt");
+        treeFile.deleteOnExit();
+
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        {
+            builder.write(writer);
+            writer.sync();
+        }
+
+        final RandomAccessReader reader = RandomAccessReader.open(treeFile);
+        final RangeIterator<Long, Token> tokenTree = new TokenTree(new 
MappedBuffer(reader)).iterator(KEY_CONVERTER);
+
+        tokenTree.skipTo(simpleTokenMap.lastKey() + 10);
+    }
+
+    @Test
+    public void testTokenMerge() throws Exception
+    {
+        final long min = 0, max = 1000;
+
+        // two different trees with the same offsets
+        TokenTree treeA = generateTree(min, max);
+        TokenTree treeB = generateTree(min, max);
+
+        RangeIterator<Long, Token> a = treeA.iterator(new KeyConverter());
+        RangeIterator<Long, Token> b = treeB.iterator(new KeyConverter());
+
+        long count = min;
+        while (a.hasNext() && b.hasNext())
+        {
+            final Token tokenA = a.next();
+            final Token tokenB = b.next();
+
+            // merging of two OnDiskToken
+            tokenA.merge(tokenB);
+            // merging with RAM Token with different offset
+            tokenA.merge(new TokenWithOffsets(tokenA.get(), convert(count + 
1)));
+            // and RAM token with the same offset
+            tokenA.merge(new TokenWithOffsets(tokenA.get(), convert(count)));
+
+            // should fail when trying to merge different tokens
+            try
+            {
+                tokenA.merge(new TokenWithOffsets(tokenA.get() + 1, 
convert(count)));
+                Assert.fail();
+            }
+            catch (IllegalArgumentException e)
+            {
+                // expected
+            }
+
+            final Set<Long> offsets = new TreeSet<>();
+            for (DecoratedKey key : tokenA)
+                 offsets.add(LongType.instance.compose(key.getKey()));
+
+            Set<Long> expected = new TreeSet<>();
+            {
+                expected.add(count);
+                expected.add(count + 1);
+            }
+
+            Assert.assertEquals(expected, offsets);
+            count++;
+        }
+
+        Assert.assertEquals(max, count - 1);
+    }
+
+    @Test
+    public void testEntryTypeOrdinalLookup()
+    {
+        Assert.assertEquals(EntryType.SIMPLE, 
EntryType.of(EntryType.SIMPLE.ordinal()));
+        Assert.assertEquals(EntryType.PACKED, 
EntryType.of(EntryType.PACKED.ordinal()));
+        Assert.assertEquals(EntryType.FACTORED, 
EntryType.of(EntryType.FACTORED.ordinal()));
+        Assert.assertEquals(EntryType.OVERFLOW, 
EntryType.of(EntryType.OVERFLOW.ordinal()));
+    }
+
+    private static class EntrySetSkippableIterator extends RangeIterator<Long, 
TokenWithOffsets>
+    {
+        private final PeekingIterator<Map.Entry<Long, LongSet>> elements;
+
+        EntrySetSkippableIterator(SortedMap<Long, LongSet> elms)
+        {
+            super(elms.firstKey(), elms.lastKey(), elms.size());
+            elements = Iterators.peekingIterator(elms.entrySet().iterator());
+        }
+
+        @Override
+        public TokenWithOffsets computeNext()
+        {
+            if (!elements.hasNext())
+                return endOfData();
+
+            Map.Entry<Long, LongSet> next = elements.next();
+            return new TokenWithOffsets(next.getKey(), next.getValue());
+        }
+
+        @Override
+        protected void performSkipTo(Long nextToken)
+        {
+            while (elements.hasNext())
+            {
+                if (Long.compare(elements.peek().getKey(), nextToken) >= 0)
+                {
+                    break;
+                }
+
+                elements.next();
+            }
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+            // nothing to do here
+        }
+    }
+
+    public static class TokenWithOffsets extends Token
+    {
+        private final LongSet offsets;
+
+        public TokenWithOffsets(long token, final LongSet offsets)
+        {
+            super(token);
+            this.offsets = offsets;
+        }
+
+        @Override
+        public void merge(CombinedValue<Long> other)
+        {}
+
+        @Override
+        public int compareTo(CombinedValue<Long> o)
+        {
+            return Long.compare(token, o.get());
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof TokenWithOffsets))
+                return false;
+
+            TokenWithOffsets o = (TokenWithOffsets) other;
+            return token == o.token && offsets.equals(o.offsets);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return new HashCodeBuilder().append(token).build();
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("TokenValue(token: %d, offsets: %s)", token, 
offsets);
+        }
+
+        @Override
+        public Iterator<DecoratedKey> iterator()
+        {
+            List<DecoratedKey> keys = new ArrayList<>(offsets.size());
+            for (LongCursor offset : offsets)
+                 keys.add(dk(offset.value));
+
+            return keys.iterator();
+        }
+    }
+
+    private static Set<DecoratedKey> convert(LongSet offsets)
+    {
+        Set<DecoratedKey> keys = new HashSet<>();
+        for (LongCursor offset : offsets)
+            keys.add(KEY_CONVERTER.apply(offset.value));
+
+        return keys;
+    }
+
+    private static Set<DecoratedKey> convert(Token results)
+    {
+        Set<DecoratedKey> keys = new HashSet<>();
+        for (DecoratedKey key : results)
+            keys.add(key);
+
+        return keys;
+    }
+
+    private static LongSet convert(long... values)
+    {
+        LongSet result = new LongOpenHashSet(values.length);
+        for (long v : values)
+            result.add(v);
+
+        return result;
+    }
+
+    private static class KeyConverter implements Function<Long, DecoratedKey>
+    {
+        @Override
+        public DecoratedKey apply(Long offset)
+        {
+            return dk(offset);
+        }
+    }
+
+    private static DecoratedKey dk(Long token)
+    {
+        ByteBuffer buf = ByteBuffer.allocate(8);
+        buf.putLong(token);
+        buf.flip();
+        Long hashed = MurmurHash.hash2_64(buf, buf.position(), 
buf.remaining(), 0);
+        return new BufferDecoratedKey(new 
Murmur3Partitioner.LongToken(hashed), buf);
+    }
+
+    private static TokenTree generateTree(final long minToken, final long 
maxToken) throws IOException
+    {
+        final SortedMap<Long, LongSet> toks = new TreeMap<Long, LongSet>()
+        {{
+                for (long i = minToken; i <= maxToken; i++)
+                {
+                    LongSet offsetSet = new LongOpenHashSet();
+                    offsetSet.add(i);
+                    put(i, offsetSet);
+                }
+        }};
+
+        final TokenTreeBuilder builder = new TokenTreeBuilder(toks).finish();
+        final File treeFile = File.createTempFile("token-tree-get-test", "tt");
+        treeFile.deleteOnExit();
+
+        try (SequentialWriter writer = new SequentialWriter(treeFile, 4096, 
BufferType.ON_HEAP))
+        {
+            builder.write(writer);
+            writer.sync();
+        }
+
+        RandomAccessReader reader = null;
+
+        try
+        {
+            reader = RandomAccessReader.open(treeFile);
+            return new TokenTree(new MappedBuffer(reader));
+        }
+        finally
+        {
+            FileUtils.closeQuietly(reader);
+        }
+    }
+}

Reply via email to