Update CQLSSTableWriter to allow parallel writing of SSTables on the same table within the same JVM
Patch by Carl Yeksigian, reviewed by Benjamin Lerer for CASSANDRA-7463 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f13ce558 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f13ce558 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f13ce558 Branch: refs/heads/cassandra-2.1 Commit: f13ce558cc410f959634a6f0d31fcf7bd69be85d Parents: 748b01d Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Oct 29 13:57:25 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Oct 29 13:57:25 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../io/sstable/AbstractSSTableSimpleWriter.java | 12 ++- .../cassandra/io/sstable/CQLSSTableWriter.java | 38 +++++--- .../io/sstable/CQLSSTableWriterTest.java | 93 ++++++++++++++++++++ 4 files changed, 131 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d2cb003..9051b34 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,8 @@ * Pig: Remove errant LIMIT clause in CqlNativeStorage (CASSANDRA-8166) * Throw ConfigurationException when hsha is used with the default rpc_max_threads setting of 'unlimited' (CASSANDRA-8116) + * Allow concurrent writing of the same table in the same JVM using + CQLSSTableWriter (CASSANDRA-7463) 2.0.11: http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 2c6f82a..af1c43c 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; @@ -43,6 +44,7 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable protected ColumnFamily columnFamily; protected ByteBuffer currentSuperColumn; protected final CounterId counterid = CounterId.generate(); + protected static AtomicInteger generation = new AtomicInteger(0); public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner) { @@ -80,9 +82,15 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable return false; } }); - int maxGen = 0; + int maxGen = generation.getAndIncrement(); for (Descriptor desc : existing) - maxGen = Math.max(maxGen, desc.generation); + { + while (desc.generation > maxGen) + { + maxGen = generation.getAndIncrement(); + } + } + return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, true).filenameFor(Component.DATA); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 49a1259..93d3dcf 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import org.apache.cassandra.cql3.statements.*; import org.apache.cassandra.cql3.*; @@ -335,18 +336,31 @@ public class CQLSSTableWriter implements Closeable { try { - this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild(); - - // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly - // build the insert statement in using(). - KSMetaData ksm = KSMetaData.newKeyspace(this.schema.ksName, - AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), - ImmutableMap.of("replication_factor", "1"), - true, - Collections.singleton(this.schema)); - - Schema.instance.load(ksm); - return this; + synchronized (CQLSSTableWriter.class) + { + this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild(); + + // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly + // build the insert statement in using(). + KSMetaData ksm = Schema.instance.getKSMetaData(this.schema.ksName); + if (ksm == null) + { + ksm = KSMetaData.newKeyspace(this.schema.ksName, + AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), + ImmutableMap.of("replication_factor", "1"), + true, + Collections.singleton(this.schema)); + Schema.instance.load(ksm); + } + else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null) + { + Schema.instance.load(this.schema); + ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(this.schema))); + Schema.instance.setKeyspaceDefinition(ksm); + Keyspace.open(ksm.name).initCf(this.schema.cfId, this.schema.cfName, false); + } + return this; + } } catch (RequestValidationException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13ce558/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index de814e1..0f123a4 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -158,4 +158,97 @@ public class CQLSSTableWriterTest }; assert tempdir.list(filterDataFiles).length > 1 : Arrays.toString(tempdir.list(filterDataFiles)); } + + + private static final int NUMBER_WRITES_IN_RUNNABLE = 10; + private class WriterThread extends Thread + { + private final File dataDir; + private final int id; + public volatile Exception exception; + + public WriterThread(File dataDir, int id) + { + this.dataDir = dataDir; + this.id = id; + } + + @Override + public void run() + { + String schema = "CREATE TABLE cql_keyspace.table2 (" + + " k int," + + " v int," + + " PRIMARY KEY (k, v)" + + ")"; + String insert = "INSERT INTO cql_keyspace.table2 (k, v) VALUES (?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .withPartitioner(StorageService.instance.getPartitioner()) + .using(insert).build(); + + try + { + for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++) + { + writer.addRow(id, i); + } + writer.close(); + } + catch (Exception e) + { + exception = e; + } + } + } + + @Test + public void testConcurrentWriters() throws Exception + { + String KS = "cql_keyspace"; + String TABLE = "table2"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + WriterThread[] threads = new WriterThread[5]; + for (int i = 0; i < threads.length; i++) + { + WriterThread thread = new WriterThread(dataDir, i); + threads[i] = thread; + thread.start(); + } + + for (WriterThread thread : threads) + { + thread.join(); + assert !thread.isAlive() : "Thread should be dead by now"; + if (thread.exception != null) + { + throw thread.exception; + } + } + + SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() + { + public void init(String keyspace) + { + for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) + addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + setPartitioner(StorageService.getPartitioner()); + } + + public CFMetaData getCFMetaData(String keyspace, String cfName) + { + return Schema.instance.getCFMetaData(keyspace, cfName); + } + }, new OutputHandler.SystemOutput(false, false)); + + loader.stream().get(); + + UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table2;"); + assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); + } }