Repository: cassandra Updated Branches: refs/heads/trunk 9797511c5 -> fa1131679
Fix rebuild of SASI indexes with existing index files Patch by Alex Petrov; reviewed by Pavel Yaskevich for CASSANDRA-12374 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa113167 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa113167 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa113167 Branch: refs/heads/trunk Commit: fa113167956a6163156a0f475171d1c41f9ed7c2 Parents: 9797511 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Fri Aug 5 18:05:38 2016 +0200 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Thu Aug 18 15:05:11 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/index/sasi/SASIIndex.java | 1 + .../cassandra/index/sasi/SASIIndexBuilder.java | 3 +- .../cassandra/index/sasi/conf/ColumnIndex.java | 5 ++ .../cassandra/index/sasi/conf/DataTracker.java | 19 ++++++++ .../cassandra/index/sasi/SASIIndexTest.java | 49 ++++++++++++++++++++ 6 files changed, 77 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa113167/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 07c18c5..0e1e118 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054) * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa113167/src/java/org/apache/cassandra/index/sasi/SASIIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index 0b9d900..4375964 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -73,6 +73,7 @@ public class SASIIndex implements Index, INotificationConsumer .filter((i) -> i instanceof SASIIndex) .forEach((i) -> { SASIIndex sasi = (SASIIndex) i; + sasi.index.dropData(sstablesToRebuild); sstablesToRebuild.stream() .filter((sstable) -> !sasi.index.hasSSTable(sstable)) .forEach((sstable) -> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa113167/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index 1173d40..d50875a 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@ -99,7 +99,8 @@ class SASIIndexBuilder extends SecondaryIndexBuilder try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key)) { // if the row has statics attached, it has to be indexed separately - indexWriter.nextUnfilteredCluster(partition.staticRow()); + if (cfs.metadata.hasStaticColumns()) + indexWriter.nextUnfilteredCluster(partition.staticRow()); while (partition.hasNext()) indexWriter.nextUnfilteredCluster(partition.next()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa113167/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java index 440d475..0958113 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java @@ -194,6 +194,11 @@ public class ColumnIndex return tracker.hasSSTable(sstable); } + public void dropData(Collection<SSTableReader> sstablesToRebuild) + { + tracker.dropData(sstablesToRebuild); + } + public void dropData(long truncateUntil) { switchMemtable(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa113167/src/java/org/apache/cassandra/index/sasi/conf/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/conf/DataTracker.java b/src/java/org/apache/cassandra/index/sasi/conf/DataTracker.java index 9475d12..af8e07d 100644 --- a/src/java/org/apache/cassandra/index/sasi/conf/DataTracker.java +++ b/src/java/org/apache/cassandra/index/sasi/conf/DataTracker.java @@ -92,6 +92,25 @@ public class DataTracker return false; } + public void dropData(Collection<SSTableReader> sstablesToRebuild) + { + View currentView = view.get(); + if (currentView == null) + return; + + Set<SSTableReader> toRemove = new HashSet<>(sstablesToRebuild); + for (SSTableIndex index : currentView) + { + SSTableReader sstable = index.getSSTable(); + if (!sstablesToRebuild.contains(sstable)) + continue; + + index.markObsolete(); + } + + update(toRemove, Collections.<SSTableReader>emptyList()); + } + public void dropData(long truncateUntil) { View currentView = view.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa113167/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java index e2797e2..0b4e9e2 100644 --- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java @@ -17,7 +17,14 @@ */ package org.apache.cassandra.index.sasi; +import java.io.File; +import java.io.FileWriter; +import java.io.Writer; import java.nio.ByteBuffer; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -37,6 +44,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; @@ -52,6 +60,8 @@ import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException; import org.apache.cassandra.index.sasi.memory.IndexMemtable; import org.apache.cassandra.index.sasi.plan.QueryController; import org.apache.cassandra.index.sasi.plan.QueryPlan; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; @@ -1860,6 +1870,45 @@ public class SASIIndexTest } @Test + public void testTableRebuild() throws Exception + { + ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CLUSTERING_CF_NAME_1); + + executeCQL(CLUSTERING_CF_NAME_1, "INSERT INTO %s.%s (name, nickname, location, age, height, score) VALUES (?, ?, ?, ?, ?, ?)", "Pavel", "xedin", "US", 27, 183, 1.0); + executeCQL(CLUSTERING_CF_NAME_1, "INSERT INTO %s.%s (name, location, age, height, score) VALUES (?, ?, ?, ?, ?)", "Pavel", "BY", 28, 182, 2.0); + executeCQL(CLUSTERING_CF_NAME_1, "INSERT INTO %s.%s (name, nickname, location, age, height, score) VALUES (?, ?, ?, ?, ?, ?)", "Jordan", "jrwest", "US", 27, 182, 1.0); + + store.forceBlockingFlush(); + + SSTable ssTable = store.getSSTables(SSTableSet.LIVE).iterator().next(); + Path path = FileSystems.getDefault().getPath(ssTable.getFilename().replace("-Data", "-SI_age")); + + // Overwrite index file with garbage + Writer writer = new FileWriter(path.toFile(), false); + writer.write("garbage"); + writer.close(); + long size1 = Files.readAttributes(path, BasicFileAttributes.class).size(); + + // Trying to query the corrupted index file yields no results + Assert.assertTrue(executeCQL(CLUSTERING_CF_NAME_1, "SELECT * FROM %s.%s WHERE age = 27 AND name = 'Pavel'").isEmpty()); + + // Rebuld index + store.rebuildSecondaryIndex("age"); + + long size2 = Files.readAttributes(path, BasicFileAttributes.class).size(); + // Make sure that garbage was overwriten + Assert.assertTrue(size2 > size1); + + // Make sure that indexes work for rebuit tables + CQLTester.assertRows(executeCQL(CLUSTERING_CF_NAME_1, "SELECT * FROM %s.%s WHERE age = 27 AND name = 'Pavel'"), + CQLTester.row("Pavel", "US", 27, "xedin", 183, 1.0)); + CQLTester.assertRows(executeCQL(CLUSTERING_CF_NAME_1, "SELECT * FROM %s.%s WHERE age = 28"), + CQLTester.row("Pavel", "BY", 28, "xedin", 182, 2.0)); + CQLTester.assertRows(executeCQL(CLUSTERING_CF_NAME_1, "SELECT * FROM %s.%s WHERE score < 2.0 AND nickname = 'jrwest' ALLOW FILTERING"), + CQLTester.row("Jordan", "US", 27, "jrwest", 182, 1.0)); + } + + @Test public void testInvalidIndexOptions() { ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);