BulkOutputFormat no longer unnecessarily looks for cassandra.yaml. Patch by brandonwilliams, reviewed by Chris Goffinet for CASSANDRA-3740
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2c22a85 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2c22a85 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2c22a85 Branch: refs/heads/trunk Commit: d2c22a855c260a9c4e51ac41f42b7b3de46a0a7b Parents: e0c655d Author: Brandon Williams <brandonwilli...@apache.org> Authored: Tue Feb 14 07:29:35 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Tue Feb 14 07:29:35 2012 -0600 ---------------------------------------------------------------------- src/java/org/apache/cassandra/config/Config.java | 11 +++ .../cassandra/config/DatabaseDescriptor.java | 68 +++++++-------- .../apache/cassandra/hadoop/BulkRecordWriter.java | 10 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 10 ++- .../apache/cassandra/io/sstable/SSTableLoader.java | 8 ++ .../io/sstable/SSTableSimpleUnsortedWriter.java | 5 +- .../cassandra/io/sstable/SSTableSimpleWriter.java | 9 ++- .../apache/cassandra/service/StorageService.java | 31 +++---- .../io/sstable/SSTableSimpleWriterTest.java | 5 +- 9 files changed, 92 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index ec134a0..69e0c40 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -136,6 +136,7 @@ public class Config public int row_cache_keys_to_save = Integer.MAX_VALUE; public String row_cache_provider = ConcurrentLinkedHashCacheProvider.class.getSimpleName(); + private static boolean loadYaml = true; private static boolean outboundBindAny = false; public static boolean getOutboundBindAny() @@ -148,6 +149,16 @@ public class Config outboundBindAny = value; } + public static boolean getLoadYaml() + { + return loadYaml; + } + + public static void setLoadYaml(boolean value) + { + loadYaml = value; + } + public static enum CommitLogSync { periodic, batch http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index fdff2a7..20a5466 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -111,38 +111,36 @@ public class DatabaseDescriptor return url; } - public static void initDefaultsOnly() + static { - conf = new Config(); + if (Config.getLoadYaml()) + loadYaml(); + else + conf = new Config(); } - - static + static void loadYaml() { try { - // only load yaml if conf wasn't already set - if (conf == null) + URL url = getStorageConfigURL(); + logger.info("Loading settings from " + url); + InputStream input = null; + try { - URL url = getStorageConfigURL(); - logger.info("Loading settings from " + url); - InputStream input = null; - try - { - input = url.openStream(); - } - catch (IOException e) - { - // getStorageConfigURL should have ruled this out - throw new AssertionError(e); - } - org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); - TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); - seedDesc.putMapPropertyType("parameters", String.class, String.class); - constructor.addTypeDescription(seedDesc); - Yaml yaml = new Yaml(new Loader(constructor)); - conf = (Config)yaml.load(input); + input = url.openStream(); } - + catch (IOException e) + { + // getStorageConfigURL should have ruled this out + throw new AssertionError(e); + } + org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); + TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); + seedDesc.putMapPropertyType("parameters", String.class, String.class); + constructor.addTypeDescription(seedDesc); + Yaml yaml = new Yaml(new Loader(constructor)); + conf = (Config)yaml.load(input); + if (conf.commitlog_sync == null) { throw new ConfigurationException("Missing required directive CommitLogSync"); @@ -153,7 +151,7 @@ public class DatabaseDescriptor if (conf.commitlog_sync_batch_window_in_ms == null) { throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected."); - } + } else if (conf.commitlog_sync_period_in_ms != null) { throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync"); @@ -173,7 +171,7 @@ public class DatabaseDescriptor logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms); } - /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */ + /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */ if (conf.disk_access_mode == Config.DiskAccessMode.auto) { conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard; @@ -202,7 +200,7 @@ public class DatabaseDescriptor authority = FBUtilities.<IAuthority>construct(conf.authority, "authority"); authenticator.validateConfiguration(); authority.validateConfiguration(); - + /* Hashing strategy */ if (conf.partitioner == null) { @@ -222,9 +220,9 @@ public class DatabaseDescriptor { throw new ConfigurationException("phi_convict_threshold must be between 5 and 16"); } - + /* Thread per pool */ - if (conf.concurrent_reads != null && conf.concurrent_reads < 2) + if (conf.concurrent_reads != null && conf.concurrent_reads < 2) { throw new ConfigurationException("concurrent_reads must be at least 2"); } @@ -275,7 +273,7 @@ public class DatabaseDescriptor { throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!"); } - + try { broadcastAddress = InetAddress.getByName(conf.broadcast_address); @@ -285,7 +283,7 @@ public class DatabaseDescriptor throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'"); } } - + /* Local IP or hostname to bind RPC server to */ if (conf.rpc_address != null) { @@ -360,7 +358,7 @@ public class DatabaseDescriptor { logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap); } - + if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0) { throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer"); @@ -438,7 +436,7 @@ public class DatabaseDescriptor { throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required."); } - try + try { Class seedProviderClass = Class.forName(conf.seed_provider.class_name); seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index bd2bdbc..9962f24 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.net.UnknownHostException; import java.util.*; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.AbstractType; @@ -73,10 +74,6 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private CFType cfType; private ColType colType; - static { - DatabaseDescriptor.initDefaultsOnly(); // make sure DD doesn't load yaml - } - BulkRecordWriter(TaskAttemptContext context) throws IOException { this(context.getConfiguration()); @@ -84,10 +81,12 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> BulkRecordWriter(Configuration conf) throws IOException { + Config.setLoadYaml(false); + Config.setOutboundBindAny(true); this.conf = conf; DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0"))); String keyspace = ConfigHelper.getOutputKeyspace(conf); - outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader + outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); //dir must be named by ks/cf for the loader outputdir.mkdirs(); } @@ -123,6 +122,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> subcomparator = BytesType.instance; this.writer = new SSTableSimpleUnsortedWriter( outputdir, + ConfigHelper.getOutputPartitioner(conf), ConfigHelper.getOutputKeyspace(conf), ConfigHelper.getOutputColumnFamily(conf), BytesType.instance, http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 163bbbd..ed76a4c 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -26,9 +26,10 @@ import java.util.HashSet; import java.util.Set; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.utils.NodeId; import org.apache.cassandra.utils.Pair; @@ -41,10 +42,11 @@ public abstract class AbstractSSTableSimpleWriter protected SuperColumn currentSuperColumn; protected final NodeId nodeid = NodeId.generate(); - public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata) + public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner) { this.metadata = metadata; this.directory = directory; + DatabaseDescriptor.setPartitioner(partitioner); } protected SSTableWriter getWriter() throws IOException @@ -53,7 +55,7 @@ public abstract class AbstractSSTableSimpleWriter makeFilename(directory, metadata.ksName, metadata.cfName), 0, // We don't care about the bloom filter metadata, - StorageService.getPartitioner(), + DatabaseDescriptor.getPartitioner(), SSTableMetadata.createCollector()); } @@ -91,7 +93,7 @@ public abstract class AbstractSSTableSimpleWriter if (currentKey != null && !columnFamily.isEmpty()) writeRow(currentKey, columnFamily); - currentKey = StorageService.getPartitioner().decorateKey(key); + currentKey = DatabaseDescriptor.getPartitioner().decorateKey(key); columnFamily = getColumnFamily(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 131deb2..1ee7a2f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -28,7 +28,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -47,6 +49,11 @@ public class SSTableLoader private final Client client; private final OutputHandler outputHandler; + static + { + Config.setLoadYaml(false); + } + public SSTableLoader(File directory, Client client, OutputHandler outputHandler) { this.directory = directory; @@ -265,6 +272,7 @@ public class SSTableLoader protected void setPartitioner(String partclass) throws ConfigurationException { this.partitioner = FBUtilities.newPartitioner(partclass); + DatabaseDescriptor.setPartitioner(partitioner); } public IPartitioner getPartitioner() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 4e05a41..eadc16d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.Condition; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.HeapAllocator; @@ -59,6 +60,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter /** * Create a new buffering writer. * @param directory the directory where to write the sstables + * @param partitioner the partitioner * @param keyspace the keyspace name * @param columnFamily the column family name * @param comparator the column family comparator @@ -68,13 +70,14 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter * columns you add). For 1GB of heap, a 128 bufferSizeInMB is probably a reasonable choice. If you experience OOM, this value should be lowered. */ public SSTableSimpleUnsortedWriter(File directory, + IPartitioner partitioner, String keyspace, String columnFamily, AbstractType<?> comparator, AbstractType<?> subComparator, int bufferSizeInMB) throws IOException { - super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator)); + super(directory, new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator), partitioner); this.bufferSize = bufferSizeInMB * 1024L * 1024L; this.diskWriter.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java index 10242da..d9a2b6e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.IPartitioner; /** * A SSTable writer that assumes rows are in (partitioner) sorted order. @@ -43,24 +44,26 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter /** * Create a new writer. * @param directory the directory where to write the sstable + * @param partitioner the partitioner * @param keyspace the keyspace name * @param columnFamily the column family name * @param comparator the column family comparator * @param subComparator the column family subComparator or null if not a Super column family. */ public SSTableSimpleWriter(File directory, + IPartitioner partitioner, String keyspace, String columnFamily, AbstractType<?> comparator, AbstractType<?> subComparator) throws IOException { this(directory, - new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator)); + new CFMetaData(keyspace, columnFamily, subComparator == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, comparator, subComparator), partitioner); } - public SSTableSimpleWriter(File directory, CFMetaData metadata) throws IOException + public SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner) throws IOException { - super(directory, metadata); + super(directory, metadata, partitioner); writer = getWriter(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 9bcd54d..d06b4a2 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -184,14 +184,13 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe /* This abstraction maintains the token/endpoint metadata information */ private TokenMetadata tokenMetadata_ = new TokenMetadata(); - private IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - public VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + public VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner()); public static final StorageService instance = new StorageService(); public static IPartitioner getPartitioner() { - return instance.partitioner; + return DatabaseDescriptor.getPartitioner(); } public Collection<Range<Token>> getLocalRanges(String table) @@ -609,12 +608,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe String initialToken = DatabaseDescriptor.getInitialToken(); if (initialToken == null) { - token = partitioner.getRandomToken(); + token = getPartitioner().getRandomToken(); logger_.warn("Generated random token " + token + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations"); } else { - token = partitioner.getTokenFactory().fromString(initialToken); + token = getPartitioner().getTokenFactory().fromString(initialToken); logger_.info("Saved token not found. Using " + token + " from configuration"); } } @@ -1987,12 +1986,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe public List<InetAddress> getNaturalEndpoints(String table, String cf, String key) { CFMetaData cfMetaData = Schema.instance.getTableDefinition(table).cfMetaData().get(cf); - return getNaturalEndpoints(table, partitioner.getToken(cfMetaData.getKeyValidator().fromString(key))); + return getNaturalEndpoints(table, getPartitioner().getToken(cfMetaData.getKeyValidator().fromString(key))); } public List<InetAddress> getNaturalEndpoints(String table, ByteBuffer key) { - return getNaturalEndpoints(table, partitioner.getToken(key)); + return getNaturalEndpoints(table, getPartitioner().getToken(key)); } /** @@ -2018,7 +2017,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe */ public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer key) { - return getLiveNaturalEndpoints(table, partitioner.decorateKey(key)); + return getLiveNaturalEndpoints(table, getPartitioner().decorateKey(key)); } public List<InetAddress> getLiveNaturalEndpoints(String table, RingPosition pos) @@ -2088,7 +2087,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe Token token; if (keys.size() < 3) { - token = partitioner.midpoint(range.left, range.right); + token = getPartitioner().midpoint(range.left, range.right); logger_.debug("Used midpoint to assign token " + token); } else @@ -2203,8 +2202,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe public void move(String newToken) throws IOException, InterruptedException, ConfigurationException { - partitioner.getTokenFactory().validate(newToken); - move(partitioner.getTokenFactory().fromString(newToken)); + getPartitioner().getTokenFactory().validate(newToken); + move(getPartitioner().getTokenFactory().fromString(newToken)); } /** @@ -2396,7 +2395,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { InetAddress myAddress = FBUtilities.getBroadcastAddress(); Token localToken = tokenMetadata_.getToken(myAddress); - Token token = partitioner.getTokenFactory().fromString(tokenString); + Token token = getPartitioner().getTokenFactory().fromString(tokenString); InetAddress endpoint = tokenMetadata_.getEndpoint(token); if (endpoint == null) @@ -2585,9 +2584,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // Never ever do this at home. Used by tests. IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner) { - IPartitioner oldPartitioner = partitioner; - partitioner = newPartitioner; - valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + IPartitioner oldPartitioner = DatabaseDescriptor.getPartitioner(); + DatabaseDescriptor.setPartitioner(newPartitioner); + valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner()); return oldPartitioner; } @@ -2607,7 +2606,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { List<Token> sortedTokens = new ArrayList<Token>(tokenMetadata_.getTokenToEndpointMapForReading().keySet()); Collections.sort(sortedTokens); - Map<Token, Float> token_map = partitioner.describeOwnership(sortedTokens); + Map<Token, Float> token_map = getPartitioner().describeOwnership(sortedTokens); Map<String, Float> string_map = new HashMap<String, Float>(); for(Map.Entry<Token, Float> entry : token_map.entrySet()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2c22a85/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java index 613eed4..c9edd53 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java @@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable; import java.io.File; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.RandomPartitioner; import org.junit.Test; import org.apache.cassandra.CleanupHelper; @@ -44,7 +46,8 @@ public class SSTableSimpleWriterTest extends CleanupHelper File dir = Directories.create(tablename, cfname).getDirectoryForNewSSTables(0); assert dir.exists(); - SSTableSimpleUnsortedWriter writer = new SSTableSimpleUnsortedWriter(dir, tablename, cfname, IntegerType.instance, null, 16); + IPartitioner partitioner = new RandomPartitioner(); + SSTableSimpleUnsortedWriter writer = new SSTableSimpleUnsortedWriter(dir, partitioner, tablename, cfname, IntegerType.instance, null, 16); int k = 0;