Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/258e59fc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/258e59fc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/258e59fc Branch: refs/heads/trunk Commit: 258e59fcf3eae9f808e9c37c0aee2c54458e3cf9 Parents: ac75ce3 ecf48dd Author: Yuki Morishita <yu...@apache.org> Authored: Thu Mar 5 11:52:37 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Mar 5 11:52:37 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/config/Config.java | 12 +++ .../cassandra/config/DatabaseDescriptor.java | 12 ++- .../cql3/statements/UpdateStatement.java | 41 ++++++-- src/java/org/apache/cassandra/db/Keyspace.java | 8 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 56 ++++++---- .../io/sstable/CQLSSTableWriterClientTest.java | 102 +++++++++++++++++++ .../io/sstable/CQLSSTableWriterTest.java | 23 +++-- 8 files changed, 215 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Config.java index 7e1e9ee,c683d7b..a174a0f --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -205,9 -200,10 +205,11 @@@ public class Confi public volatile int counter_cache_save_period = 7200; public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE; - public String memory_allocator = NativeAllocator.class.getSimpleName(); + @Deprecated + public String memory_allocator; + private static boolean isClientMode = false; + public Integer file_cache_size_in_mb; public boolean inter_dc_tcp_nodelay = true; @@@ -239,6 -231,16 +241,16 @@@ outboundBindAny = value; } + public static boolean isClientMode() + { - return isClientMode; ++ return isClientMode; + } + + public static void setClientMode(boolean clientMode) + { + isClientMode = clientMode; + } + public void configHintedHandoff() throws ConfigurationException { if (hinted_handoff_enabled != null && !hinted_handoff_enabled.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index b1a2684,924ab3c..af21f74 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -92,16 -115,36 +92,26 @@@ public class DatabaseDescripto private static String localDC; private static Comparator<InetAddress> localComparator; + public static void forceStaticInitialization() {} static { + // In client mode, we use a default configuration. Note that the fields of this class will be + // left unconfigured however (the partitioner or localDC will be null for instance) so this + // should be used with care. try { - applyConfig(loadConfig()); + if (Config.isClientMode()) + { + conf = new Config(); - // at least we have to set memoryAllocator to open SSTable in client mode - memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator); + } + else + { + applyConfig(loadConfig()); + } } - catch (ConfigurationException e) - { - logger.error("Fatal configuration error", e); - System.err.println(e.getMessage() + "\nFatal configuration error; unable to start. See log for stacktrace."); - System.exit(1); - } catch (Exception e) { - logger.error("Fatal error during configuration loading", e); - System.err.println(e.getMessage() + "\nFatal error during configuration loading; unable to start. See log for stacktrace."); - JVMStabilityInspector.inspectThrowable(e); - System.exit(1); + throw new ExceptionInInitializerError(e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index b7156a0,63f87c0..c783d48 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@@ -30,6 -30,6 +30,8 @@@ import org.apache.cassandra.exceptions. import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; ++import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; ++ /** * An <code>UPDATE</code> statement parsed from a CQL query statement. * @@@ -98,6 -109,20 +111,20 @@@ public class UpdateStatement extends Mo update.execute(key, cf, prefix, params); } + // validateIndexedColumns trigger a call to Keyspace.open() which we want to be able to avoid in some case + //(e.g. when using CQLSSTableWriter) + if (validateIndexedColumns) + validateIndexedColumns(cf); + } + + /** - * Checks that the value of the indexed columns is valid. ++ * Checks if the values of the indexed columns are valid. + * + * @param cf the column family - * @throws InvalidRequestException if one of the values is invalid ++ * @throws InvalidRequestException if one of the values of the indexed columns is not valid + */ - private void validateIndexedColumns(ColumnFamily cf) throws InvalidRequestException ++ private void validateIndexedColumns(ColumnFamily cf) + { SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager; if (indexManager.hasIndexes()) { @@@ -105,11 -130,11 +132,11 @@@ { // Indexed values must be validated by any applicable index. See CASSANDRA-3057/4240/8081 for more details if (!indexManager.validate(cell)) -- throw new InvalidRequestException(String.format("Can't index column value of size %d for index %s on %s.%s", -- cell.value().remaining(), -- cfm.getColumnDefinition(cell.name()).getIndexName(), -- cfm.ksName, -- cfm.cfName)); ++ throw invalidRequest("Can't index column value of size %d for index %s on %s.%s", ++ cell.value().remaining(), ++ cfm.getColumnDefinition(cell.name()).getIndexName(), ++ cfm.ksName, ++ cfm.cfName); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index bffddc6,d92eea4..e3301b1 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -67,7 -65,8 +64,8 @@@ public class Keyspac // proper directories here as well as in CassandraDaemon. static { - DatabaseDescriptor.createAllDirectories(); - if (!(Config.isClientMode() || StorageService.instance.isClientMode())) ++ if (!Config.isClientMode()) + DatabaseDescriptor.createAllDirectories(); } public final KSMetaData metadata; http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index d137fe2,b4bef7d..53cb094 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@@ -21,22 -21,19 +21,21 @@@ import java.io.Closeable import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import org.apache.cassandra.cql3.statements.*; +import org.apache.cassandra.config.CFMetaData; ++import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.config.*; -import org.apache.cassandra.db.*; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.cql3.statements.UpdateStatement; +import org.apache.cassandra.db.ArrayBackedSortedColumns; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; - import org.apache.cassandra.db.Keyspace; - import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; @@@ -388,6 -367,35 +371,35 @@@ public class CQLSSTableWriter implement } /** - * Adds the specified column family to the specified keyspace. ++ * Creates the keyspace with the specified table. + * - * @param ksm the keyspace meta data - * @param cfm the column family meta data ++ * @param the table the table that must be created. + */ - private static void addTableToKeyspace(KSMetaData ksm, CFMetaData cfm) ++ private static void createKeyspaceWithTable(CFMetaData table) + { - ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm))); - Schema.instance.load(cfm); - Schema.instance.setKeyspaceDefinition(ksm); ++ KSMetaData ksm; ++ ksm = KSMetaData.newKeyspace(table.ksName, ++ AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), ++ ImmutableMap.of("replication_factor", "1"), ++ true, ++ Collections.singleton(table)); ++ Schema.instance.load(ksm); + } + + /** - * Creates a keyspace for the specified column family. ++ * Adds the table to the to the specified keyspace. + * - * @param cfm the column family - * @throws ConfigurationException if a problem occurs while creating the keyspace. ++ * @param keyspace the keyspace to add to ++ * @param table the table to add + */ - private static void createKeyspaceWithTable(CFMetaData cfm) throws ConfigurationException ++ private static void addTableToKeyspace(KSMetaData keyspace, CFMetaData table) + { - KSMetaData ksm = KSMetaData.newKeyspace(cfm.ksName, - AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), - ImmutableMap.of("replication_factor", "1"), - true, - Collections.singleton(cfm)); - Schema.instance.load(ksm); ++ KSMetaData clone = keyspace.cloneWithTableAdded(table); ++ Schema.instance.load(table); ++ Schema.instance.setKeyspaceDefinition(clone); + } + + /** * The partitioner to use. * <p> * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java index 0000000,d10c9fb..ad2d876 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java @@@ -1,0 -1,116 +1,102 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.io.sstable; + + import java.io.File; + import java.io.FilenameFilter; + import java.io.IOException; + import java.util.Arrays; + + import com.google.common.io.Files; + + import org.junit.*; + + import org.apache.cassandra.config.Config; + import org.apache.cassandra.exceptions.InvalidRequestException; + import org.apache.cassandra.io.util.FileUtils; + + import static org.junit.Assert.assertEquals; + + import static org.junit.Assert.assertTrue; + + public class CQLSSTableWriterClientTest + { + private File testDirectory; + + @Before + public void setUp() + { + this.testDirectory = Files.createTempDir(); + } + + @After + public void tearDown() + { + FileUtils.deleteRecursive(this.testDirectory); + } + + @AfterClass + public static void cleanup() throws Exception + { + Config.setClientMode(false); + } + + @Test + public void testWriterInClientMode() throws IOException, InvalidRequestException + { + final String TABLE1 = "table1"; + final String TABLE2 = "table2"; + + String schema = "CREATE TABLE client_test.%s (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int" + + ")"; + String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, ?)"; + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(this.testDirectory) + .forTable(String.format(schema, TABLE1)) + .using(String.format(insert, TABLE1)).build(); + + CQLSSTableWriter writer2 = CQLSSTableWriter.builder() + .inDirectory(this.testDirectory) + .forTable(String.format(schema, TABLE2)) + .using(String.format(insert, TABLE2)).build(); + + writer.addRow(0, "A", 0); + writer2.addRow(0, "A", 0); + writer.addRow(1, "B", 1); + writer2.addRow(1, "B", 1); + writer.close(); + writer2.close(); + - assertContainsDataFiles(this.testDirectory, "client_test-table1", "client_test-table2"); - } - - /** - * Checks that the specified directory contains the files with the specified prefixes. - * - * @param directory the directory containing the data files - * @param prefixes the file prefixes - */ - private static void assertContainsDataFiles(File directory, String... prefixes) - { + FilenameFilter filter = new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.endsWith("-Data.db"); + } + }; + - File[] dataFiles = directory.listFiles(filter); - Arrays.sort(dataFiles); ++ File[] dataFiles = this.testDirectory.listFiles(filter); ++ assertEquals(2, dataFiles.length); + - assertEquals(dataFiles.length, prefixes.length); - for (int i = 0; i < dataFiles.length; i++) - assertTrue(dataFiles[i].toString().contains(prefixes[i])); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/258e59fc/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index 8f879d9,a2f1bcc..6278b45 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@@ -51,6 -56,12 +56,12 @@@ public class CQLSSTableWriterTes StorageService.instance.initServer(); } + @AfterClass - public static void tearDown() ++ public static void tearDown() throws Exception + { + Config.setClientMode(false); + } + @Test public void testUnsortedWriter() throws Exception { @@@ -211,8 -217,8 +222,8 @@@ @Test public void testConcurrentWriters() throws Exception { - String KS = "cql_keyspace"; - String KS = "cql_keyspace2"; -- String TABLE = "table2"; ++ final String KS = "cql_keyspace2"; ++ final String TABLE = "table2"; File tempdir = Files.createTempDir(); File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); @@@ -240,7 -246,7 +251,7 @@@ { public void init(String keyspace) { - for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) - for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace2")) ++ for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); setPartitioner(StorageService.getPartitioner()); }