Author: johan Date: Thu May 6 16:42:27 2010 New Revision: 941797 URL: http://svn.apache.org/viewvc?rev=941797&view=rev Log: Update word count example to work with changes in trunk. Word count setup now uses the thrift interface. Patch by Jeremy Hanna, review by johan. CASSANDRA-1030
Added: cassandra/trunk/contrib/word_count/cassandra.yaml Removed: cassandra/trunk/contrib/word_count/storage-conf.xml Modified: cassandra/trunk/contrib/word_count/bin/word_count_setup cassandra/trunk/contrib/word_count/build.xml cassandra/trunk/contrib/word_count/src/WordCountSetup.java cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Modified: cassandra/trunk/contrib/word_count/bin/word_count_setup URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count_setup?rev=941797&r1=941796&r2=941797&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/bin/word_count_setup (original) +++ cassandra/trunk/contrib/word_count/bin/word_count_setup Thu May 6 16:42:27 2010 @@ -53,4 +53,8 @@ if [ "x$JAVA" = "x" ]; then exit 1 fi -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountSetup +HOST=localhost +PORT=9160 +FRAMED=false + +$JAVA -Xmx1G -ea -Dcassandra.host=$HOST -Dcassandra.port=$PORT -Dcassandra.framed=$FRAMED -cp $CLASSPATH WordCountSetup Modified: cassandra/trunk/contrib/word_count/build.xml URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/build.xml?rev=941797&r1=941796&r2=941797&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/build.xml (original) +++ cassandra/trunk/contrib/word_count/build.xml Thu May 6 16:42:27 2010 @@ -77,7 +77,7 @@ <zipfileset dir="${cassandra.dir}/build/lib/jars/" prefix="lib"> <include name="**/*.jar" /> </zipfileset> - <fileset file="${basedir}/storage-conf.xml" /> + <fileset file="${basedir}/cassandra.yaml" /> </jar> </target> Added: cassandra/trunk/contrib/word_count/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/cassandra.yaml?rev=941797&view=auto ============================================================================== --- cassandra/trunk/contrib/word_count/cassandra.yaml (added) +++ cassandra/trunk/contrib/word_count/cassandra.yaml Thu May 6 16:42:27 2010 @@ -0,0 +1,199 @@ +# Cassandra storage config YAML +# See http://wiki.apache.org/cassandra/StorageConfiguration for +# explanations of configuration directives. + +# name of the cluster +cluster_name: 'Test Cluster' + +# Set to true to make new [non-seed] nodes automatically migrate the +# right data to themselves. +auto_bootstrap: false + +# authentication backend, implementing IAuthenticator; used to limit keyspace access +authenticator: org.apache.cassandra.auth.AllowAllAuthenticator + +# any IPartitioner may be used, including your own as long as it is on +# the classpath. Out of the box, Cassandra provides +# org.apache.cassandra.dht.RandomPartitioner +# org.apache.cassandra.dht.OrderPreservingPartitioner, and +# org.apache.cassandra.dht.CollatingOrderPreservingPartitioner. +partitioner: org.apache.cassandra.dht.RandomPartitioner + +# directories where Cassandra should store data on disk. +data_file_directories: + - /var/lib/cassandra/data + +# Addresses of hosts that are deemed contact points. +# Cassandra nodes use this list of hosts to find each other and learn +# the topology of the ring. You must change this if you are running +# multiple nodes! +seeds: + - 127.0.0.1 + +# Access mode. mmapped i/o is substantially faster, but only practical on +# a 64bit machine (which notably does not include EC2 "small" instances) +# or relatively small datasets. "auto", the safe choice, will enable +# mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only" +# (which may allow you to get part of the benefits of mmap on a 32bit +# machine by mmapping only index files) and "standard". +# (The buffer size settings that follow only apply to standard, +# non-mmapped i/o.) +disk_access_mode: auto + +# Unlike most systems, in Cassandra writes are faster than reads, so +# you can afford more of those in parallel. A good rule of thumb is 2 +# concurrent reads per processor core. Increase ConcurrentWrites to +# the number of clients writing at once if you enable CommitLogSync + +# CommitLogSyncDelay. --> +concurrent_reads: 8 +concurrent_writes: 32 + +# Buffer size to use when performing contiguous column slices. +# Increase this to the size of the column slices you typically perform +sliced_buffer_size_in_kb: 64 + +# TCP port, for commands and data +storage_port: 7000 + +# Address to bind to and tell other nodes to connect to. You _must_ +# change this if you want multiple nodes to be able to communicate! +listen_address: localhost + +# The address to bind the Thrift RPC service to +rpc_address: localhost +# port for Thrift to listen on +rpc_port: 9160 +# Whether or not to use a framed transport for Thrift. +thrift_framed_transport: false +snapshot_before_compaction: false + +# The threshold size in megabytes the binary memtable must grow to, +# before it's submitted for flushing to disk. +binary_memtable_throughput_in_mb: 256 +# Number of minutes to keep a memtable in memory +memtable_flush_after_mins: 60 +# Size of the memtable in memory before it is dumped +memtable_throughput_in_mb: 64 +# Number of objects in millions in the memtable before it is dumped +memtable_operations_in_millions: 0.3 +# Buffer size to use when flushing !memtables to disk. +flush_data_buffer_size_in_mb: 32 +# Increase (decrease) the index buffer size relative to the data +# buffer if you have few (many) columns per key. +flush_index_buffer_size_in_mb: 8 + +column_index_size_in_kb: 64 +row_warning_threshold_in_mb: 512 + +# commit log +commitlog_directory: /var/lib/cassandra/commitlog + +# Size to allow commitlog to grow to before creating a new segment +commitlog_rotation_threshold_in_mb: 128 + +# commitlog_sync may be either "periodic" or "batch." +# When in batch mode, Cassandra won't ack writes until the commit log +# has been fsynced to disk. It will wait up to +# CommitLogSyncBatchWindowInMS milliseconds for other writes, before +# performing the sync. +commitlog_sync: periodic + +# the other option is "timed," where writes may be acked immediately +# and the CommitLog is simply synced every commitlog_sync_period_in_ms +# milliseconds. +commitlog_sync_period_in_ms: 10000 + +# Time to wait for a reply from other nodes before failing the command +rpc_timeout_in_ms: 10000 + +# time to wait before garbage collecting tombstones (deletion markers) +gc_grace_seconds: 864000 + +# endpoint_snitch -- Set this to a class that implements +# IEndpointSnitch, which will let Cassandra know enough +# about your network topology to route requests efficiently. +# Out of the box, Cassandra provides +# org.apache.cassandra.locator.SimpleSnitch, +# org.apache.cassandra.locator.RackInferringSnitch, and +# org.apache.cassandra.locator.PropertyFileSnitch. +endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch + +# A ColumnFamily is the Cassandra concept closest to a relational table. +# +# Keyspaces are separate groups of ColumnFamilies. Except in very +# unusual circumstances you will have one Keyspace per application. +# +# Keyspace required parameters: +# - name: name of the keyspace; "system" and "definitions" are +# reserved for Cassandra Internals. +# - replica_placement_strategy: the class that determines how replicas +# are distributed among nodes. Must implement IReplicaPlacementStrategy. +# Out of the box, Cassandra provides +# org.apache.cassandra.locator.RackUnawareStrategy and +# org.apache.cassandra.locator.RackAwareStrategy. RackAwareStrategy +# place one replica in each of two datacenter, and other replicas in +# different racks in one. +# - replication_factor: Number of replicas of each row +# - column_families: column families associated with this keyspace +# +# ColumnFamily required parameters: +# - name: name of the ColumnFamily. Must not contain the character "-". +# - compare_with: tells Cassandra how to sort the columns for slicing +# operations. The default is BytesType, which is a straightforward +# lexical comparison of the bytes in each column. Other options are +# AsciiType, UTF8Type, LexicalUUIDType, TimeUUIDType, and LongType. +# You can also specify the fully-qualified class name to a class of +# your choice extending org.apache.cassandra.db.marshal.AbstractType. +# +# ColumnFamily optional parameters: +# - keys_cached: specifies the number of keys per sstable whose +# locations we keep in memory in "mostly LRU" order. (JUST the key +# locations, NOT any column values.) Specify a fraction (value less +# than 1) or an absolute number of keys to cache. Defaults to 200000 +# keys. +# - rows_cached: specifies the number of rows whose entire contents we +# cache in memory. Do not use this on ColumnFamilies with large rows, +# or ColumnFamilies with high write:read ratios. Specify a fraction +# (value less than 1) or an absolute number of rows to cache. +# Defaults to 0. (i.e. row caching is off by default) +# - comment: used to attach additional human-readable information about +# the column family to its definition. +# - read_repair_chance: specifies the probability with which read +# repairs should be invoked on non-quorum reads. must be between 0 +# and 1. defaults to 1.0 (always read repair). +# - preload_row_cache: If true, will populate row cache on startup. +# Defaults to false. +# +keyspaces: + - name: Keyspace1 + replica_placement_strategy: org.apache.cassandra.locator.RackUnawareStrategy + replication_factor: 1 + column_families: + - name: Standard1 + compare_with: BytesType + + - name: Standard2 + compare_with: UTF8Type + read_repair_chance: 0.1 + keys_cached: 100 + + - name: StandardByUUID1 + compare_with: TimeUUIDType + + - name: Super1 + column_type: Super + compare_with: BytesType + compare_subcolumns_with: BytesType + + - name: Super2 + column_type: Super + compare_subcolumns_with: UTF8Type + preload_row_cache: true + rows_cached: 10000 + keys_cached: 50 + comment: 'A column family with supercolumns, whose column and subcolumn names are UTF8 strings' + + - name: Super3 + column_type: Super + compare_with: LongType + comment: 'A column family with supercolumns, whose column names are Longs (8 bytes)' Modified: cassandra/trunk/contrib/word_count/src/WordCountSetup.java URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCountSetup.java?rev=941797&r1=941796&r2=941797&view=diff ============================================================================== --- cassandra/trunk/contrib/word_count/src/WordCountSetup.java (original) +++ cassandra/trunk/contrib/word_count/src/WordCountSetup.java Thu May 6 16:42:27 2010 @@ -16,16 +16,19 @@ * limitations under the License. */ -import java.util.Arrays; +import java.util.*; +import org.apache.cassandra.thrift.*; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.*; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.ConsistencyLevel; - public class WordCountSetup { private static final Logger logger = LoggerFactory.getLogger(WordCountSetup.class); @@ -34,47 +37,93 @@ public class WordCountSetup public static void main(String[] args) throws Exception { - StorageService.instance.initClient(); - logger.info("Sleeping " + WordCount.RING_DELAY); - Thread.sleep(WordCount.RING_DELAY); - assert !StorageService.instance.getLiveNodes().isEmpty(); - - RowMutation rm; - ColumnFamily cf; - byte[] columnName; + Cassandra.Iface client = createConnection(); + + setupKeyspace(client); + + client.set_keyspace(WordCount.KEYSPACE); - // text0: no rows + Map<byte[], Map<String,List<Mutation>>> mutationMap; + Column c; // text1: 1 row, 1 word - columnName = "text1".getBytes(); - rm = new RowMutation(WordCount.KEYSPACE, "Key0".getBytes()); - cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY); - cf.addColumn(new Column(columnName, "word1".getBytes(), 0)); - rm.add(cf); - StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE); + c = new Column("text1".getBytes(), "word1".getBytes(), System.currentTimeMillis()); + mutationMap = getMutationMap("key0".getBytes(), WordCount.COLUMN_FAMILY, c); + client.batch_mutate(mutationMap, ConsistencyLevel.ONE); logger.info("added text1"); - // text2: 1 row, 2 words - columnName = "text2".getBytes(); - rm = new RowMutation(WordCount.KEYSPACE, "Key0".getBytes()); - cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY); - cf.addColumn(new Column(columnName, "word1 word2".getBytes(), 0)); - rm.add(cf); - StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE); + // text1: 1 row, 2 word + c = new Column("text2".getBytes(), "word1 word2".getBytes(), System.currentTimeMillis()); + mutationMap = getMutationMap("key0".getBytes(), WordCount.COLUMN_FAMILY, c); + client.batch_mutate(mutationMap, ConsistencyLevel.ONE); logger.info("added text2"); // text3: 1000 rows, 1 word - columnName = "text3".getBytes(); - for (int i = 0; i < 1000; i++) + mutationMap = new HashMap<byte[],Map<String,List<Mutation>>>(); + for (int i=0; i<1000; i++) { - rm = new RowMutation(WordCount.KEYSPACE, ("Key" + i).getBytes()); - cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY); - cf.addColumn(new Column(columnName, "word1".getBytes(), 0)); - rm.add(cf); - StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE); + c = new Column("text3".getBytes(), "word1".getBytes(), System.currentTimeMillis()); + addToMutationMap(mutationMap, ("key" + i).getBytes(), WordCount.COLUMN_FAMILY, c); } + client.batch_mutate(mutationMap, ConsistencyLevel.ONE); logger.info("added text3"); System.exit(0); } + + private static Map<byte[],Map<String,List<Mutation>>> getMutationMap(byte[] key, String cf, Column c) { + Map<byte[],Map<String,List<Mutation>>> mutationMap = new HashMap<byte[],Map<String,List<Mutation>>>(); + addToMutationMap(mutationMap, key, cf, c); + return mutationMap; + } + + private static void addToMutationMap(Map<byte[],Map<String,List<Mutation>>> mutationMap, byte[] key, String cf, Column c) + { + Map<String,List<Mutation>> cfMutation = new HashMap<String,List<Mutation>>(); + List<Mutation> mList = new ArrayList<Mutation>(); + ColumnOrSuperColumn cc = new ColumnOrSuperColumn(); + Mutation m = new Mutation(); + + cc.setColumn(c); + m.setColumn_or_supercolumn(cc); + mList.add(m); + cfMutation.put(cf, mList); + mutationMap.put(key, cfMutation); + } + + private static void setupKeyspace(Cassandra.Iface client) throws TException, InvalidRequestException + { + List<CfDef> cfDefList = new ArrayList<CfDef>(); + CfDef cfDef = new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY); + cfDefList.add(cfDef); + + client.system_add_keyspace(new KsDef(WordCount.KEYSPACE, "org.apache.cassandra.locator.RackUnawareStrategy", 1, cfDefList)); + } + + private static Cassandra.Iface createConnection() throws TTransportException + { + if(System.getProperty("cassandra.host") == null || System.getProperty("cassandra.port") == null) + { + logger.warn("cassandra.host or cassandra.port is not defined, using default"); + } + return createConnection( System.getProperty("cassandra.host","localhost"), + Integer.valueOf(System.getProperty("cassandra.port","9160")), + Boolean.valueOf(System.getProperty("cassandra.framed", "false")) ); + } + + private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws TTransportException + { + TSocket socket = new TSocket(host, port); + TTransport trans; + + if(framed) + trans = new TFramedTransport(socket); + else + trans = socket; + + trans.open(); + TProtocol protocol = new TBinaryProtocol(trans); + + return new Cassandra.Client(protocol); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=941797&r1=941796&r2=941797&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Thu May 6 16:42:27 2010 @@ -24,14 +24,11 @@ package org.apache.cassandra.hadoop; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.*; import com.google.common.collect.AbstractIterator; +import org.apache.cassandra.auth.AllowAllAuthenticator; import org.apache.cassandra.auth.SimpleAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; @@ -112,8 +109,8 @@ public class ColumnFamilyRecordReader ex private String startToken; private int totalRead = 0; private int i = 0; - private AbstractType comparator = DatabaseDescriptor.getComparator(keyspace, cfName); - + private AbstractType comparator = null; + private void maybeInit() { // check if we need another batch @@ -151,7 +148,17 @@ public class ColumnFamilyRecordReader ex try { client.set_keyspace(keyspace); - client.login(authRequest); + if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator)) + { + client.login(authRequest); + } + + // Get the keyspace information to get the comparator + Map<String, Map<String,String>> desc = client.describe_keyspace(keyspace); + Map<String,String> ksProps = desc.get(cfName); + String compClass = ksProps.get("CompareWith"); + comparator = (AbstractType) Class.forName(compClass).newInstance(); + rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=941797&r1=941796&r2=941797&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Thu May 6 16:42:27 2010 @@ -23,7 +23,6 @@ package org.apache.cassandra.hadoop; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.ThriftValidation; import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -59,14 +58,7 @@ public class ConfigHelper { throw new UnsupportedOperationException("columnfamily may not be null"); } - try - { - ThriftValidation.validateColumnFamily(keyspace, columnFamily); - } - catch (InvalidRequestException e) - { - throw new RuntimeException(e); - } + conf.set(KEYSPACE_CONFIG, keyspace); conf.set(COLUMNFAMILY_CONFIG, columnFamily); }