Author: johan
Date: Thu May  6 16:42:27 2010
New Revision: 941797

URL: http://svn.apache.org/viewvc?rev=941797&view=rev
Log:
Update word count example to work with changes in trunk. Word count setup now 
uses the thrift interface. Patch by Jeremy Hanna, review by johan. 
CASSANDRA-1030

Added:
    cassandra/trunk/contrib/word_count/cassandra.yaml
Removed:
    cassandra/trunk/contrib/word_count/storage-conf.xml
Modified:
    cassandra/trunk/contrib/word_count/bin/word_count_setup
    cassandra/trunk/contrib/word_count/build.xml
    cassandra/trunk/contrib/word_count/src/WordCountSetup.java
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java

Modified: cassandra/trunk/contrib/word_count/bin/word_count_setup
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count_setup?rev=941797&r1=941796&r2=941797&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/bin/word_count_setup (original)
+++ cassandra/trunk/contrib/word_count/bin/word_count_setup Thu May  6 16:42:27 
2010
@@ -53,4 +53,8 @@ if [ "x$JAVA" = "x" ]; then
     exit 1
 fi
 
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountSetup
+HOST=localhost
+PORT=9160
+FRAMED=false
+
+$JAVA -Xmx1G -ea -Dcassandra.host=$HOST -Dcassandra.port=$PORT 
-Dcassandra.framed=$FRAMED -cp $CLASSPATH WordCountSetup

Modified: cassandra/trunk/contrib/word_count/build.xml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/build.xml?rev=941797&r1=941796&r2=941797&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/build.xml (original)
+++ cassandra/trunk/contrib/word_count/build.xml Thu May  6 16:42:27 2010
@@ -77,7 +77,7 @@
            <zipfileset dir="${cassandra.dir}/build/lib/jars/" prefix="lib">
                <include name="**/*.jar" />
            </zipfileset>
-           <fileset file="${basedir}/storage-conf.xml" />
+           <fileset file="${basedir}/cassandra.yaml" />
         </jar>
     </target>
 

Added: cassandra/trunk/contrib/word_count/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/cassandra.yaml?rev=941797&view=auto
==============================================================================
--- cassandra/trunk/contrib/word_count/cassandra.yaml (added)
+++ cassandra/trunk/contrib/word_count/cassandra.yaml Thu May  6 16:42:27 2010
@@ -0,0 +1,199 @@
+# Cassandra storage config YAML 
+# See http://wiki.apache.org/cassandra/StorageConfiguration for
+# explanations of configuration directives.
+
+# name of the cluster
+cluster_name: 'Test Cluster'
+
+# Set to true to make new [non-seed] nodes automatically migrate the
+# right data to themselves.
+auto_bootstrap: false
+
+# authentication backend, implementing IAuthenticator; used to limit keyspace 
access
+authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
+
+# any IPartitioner may be used, including your own as long as it is on
+# the classpath.  Out of the box, Cassandra provides
+# org.apache.cassandra.dht.RandomPartitioner
+# org.apache.cassandra.dht.OrderPreservingPartitioner, and
+# org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
+partitioner: org.apache.cassandra.dht.RandomPartitioner
+
+# directories where Cassandra should store data on disk.
+data_file_directories:
+    - /var/lib/cassandra/data
+
+# Addresses of hosts that are deemed contact points. 
+# Cassandra nodes use this list of hosts to find each other and learn
+# the topology of the ring.  You must change this if you are running
+# multiple nodes!
+seeds:
+    - 127.0.0.1
+
+# Access mode.  mmapped i/o is substantially faster, but only practical on
+# a 64bit machine (which notably does not include EC2 "small" instances)
+# or relatively small datasets.  "auto", the safe choice, will enable
+# mmapping on a 64bit JVM.  Other values are "mmap", "mmap_index_only"
+# (which may allow you to get part of the benefits of mmap on a 32bit
+# machine by mmapping only index files) and "standard".
+# (The buffer size settings that follow only apply to standard,
+# non-mmapped i/o.)
+disk_access_mode: auto
+
+# Unlike most systems, in Cassandra writes are faster than reads, so
+# you can afford more of those in parallel.  A good rule of thumb is 2
+# concurrent reads per processor core.  Increase ConcurrentWrites to
+# the number of clients writing at once if you enable CommitLogSync +
+# CommitLogSyncDelay. -->
+concurrent_reads: 8
+concurrent_writes: 32
+
+# Buffer size to use when performing contiguous column slices. 
+# Increase this to the size of the column slices you typically perform
+sliced_buffer_size_in_kb: 64
+
+# TCP port, for commands and data
+storage_port: 7000
+
+# Address to bind to and tell other nodes to connect to. You _must_
+# change this if you want multiple nodes to be able to communicate!
+listen_address: localhost
+
+# The address to bind the Thrift RPC service to
+rpc_address: localhost
+# port for Thrift to listen on
+rpc_port: 9160
+# Whether or not to use a framed transport for Thrift.
+thrift_framed_transport: false
+snapshot_before_compaction: false
+
+# The threshold size in megabytes the binary memtable must grow to,
+# before it's submitted for flushing to disk.
+binary_memtable_throughput_in_mb: 256
+# Number of minutes to keep a memtable in memory
+memtable_flush_after_mins: 60
+# Size of the memtable in memory before it is dumped
+memtable_throughput_in_mb: 64
+# Number of objects in millions in the memtable before it is dumped
+memtable_operations_in_millions: 0.3
+# Buffer size to use when flushing !memtables to disk.
+flush_data_buffer_size_in_mb: 32
+# Increase (decrease) the index buffer size relative to the data
+# buffer if you have few (many) columns per key.
+flush_index_buffer_size_in_mb: 8
+
+column_index_size_in_kb: 64
+row_warning_threshold_in_mb: 512
+
+# commit log
+commitlog_directory: /var/lib/cassandra/commitlog
+
+# Size to allow commitlog to grow to before creating a new segment 
+commitlog_rotation_threshold_in_mb: 128
+
+# commitlog_sync may be either "periodic" or "batch." 
+# When in batch mode, Cassandra won't ack writes until the commit log
+# has been fsynced to disk.  It will wait up to
+# CommitLogSyncBatchWindowInMS milliseconds for other writes, before
+# performing the sync.
+commitlog_sync: periodic
+
+# the other option is "timed," where writes may be acked immediately
+# and the CommitLog is simply synced every commitlog_sync_period_in_ms
+# milliseconds.
+commitlog_sync_period_in_ms: 10000
+
+# Time to wait for a reply from other nodes before failing the command 
+rpc_timeout_in_ms: 10000
+
+# time to wait before garbage collecting tombstones (deletion markers)
+gc_grace_seconds: 864000
+
+# endpoint_snitch -- Set this to a class that implements
+# IEndpointSnitch, which will let Cassandra know enough
+# about your network topology to route requests efficiently.
+# Out of the box, Cassandra provides
+# org.apache.cassandra.locator.SimpleSnitch,
+# org.apache.cassandra.locator.RackInferringSnitch, and
+# org.apache.cassandra.locator.PropertyFileSnitch.
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+
+# A ColumnFamily is the Cassandra concept closest to a relational table. 
+#
+# Keyspaces are separate groups of ColumnFamilies.  Except in very
+# unusual circumstances you will have one Keyspace per application.
+#
+# Keyspace required parameters:
+# - name: name of the keyspace; "system" and "definitions" are 
+#   reserved for Cassandra Internals.
+# - replica_placement_strategy: the class that determines how replicas
+#   are distributed among nodes.  Must implement IReplicaPlacementStrategy.
+#   Out of the box, Cassandra provides 
+#   org.apache.cassandra.locator.RackUnawareStrategy and
+#   org.apache.cassandra.locator.RackAwareStrategy.  RackAwareStrategy
+#   place one replica in each of two datacenter, and other replicas in
+#   different racks in one.
+# - replication_factor: Number of replicas of each row
+# - column_families: column families associated with this keyspace
+#
+# ColumnFamily required parameters:
+# - name: name of the ColumnFamily.  Must not contain the character "-".
+# - compare_with: tells Cassandra how to sort the columns for slicing
+#   operations. The default is BytesType, which is a straightforward
+#   lexical comparison of the bytes in each column.  Other options are
+#   AsciiType, UTF8Type, LexicalUUIDType, TimeUUIDType, and LongType.
+#   You can also specify the fully-qualified class name to a class of
+#   your choice extending org.apache.cassandra.db.marshal.AbstractType.
+#
+# ColumnFamily optional parameters:
+# - keys_cached: specifies the number of keys per sstable whose
+#   locations we keep in memory in "mostly LRU" order.  (JUST the key
+#   locations, NOT any column values.) Specify a fraction (value less
+#   than 1) or an absolute number of keys to cache.  Defaults to 200000
+#   keys.
+# - rows_cached: specifies the number of rows whose entire contents we
+#   cache in memory. Do not use this on ColumnFamilies with large rows,
+#   or ColumnFamilies with high write:read ratios. Specify a fraction
+#   (value less than 1) or an absolute number of rows to cache.
+#   Defaults to 0. (i.e. row caching is off by default)
+# - comment: used to attach additional human-readable information about 
+#   the column family to its definition.
+# - read_repair_chance: specifies the probability with which read
+#   repairs should be invoked on non-quorum reads.  must be between 0
+#   and 1. defaults to 1.0 (always read repair).
+# - preload_row_cache: If true, will populate row cache on startup.
+#   Defaults to false.
+#
+keyspaces:
+    - name: Keyspace1
+      replica_placement_strategy: 
org.apache.cassandra.locator.RackUnawareStrategy
+      replication_factor: 1
+      column_families:
+        - name: Standard1
+          compare_with: BytesType
+
+        - name: Standard2
+          compare_with: UTF8Type
+          read_repair_chance: 0.1
+          keys_cached: 100
+
+        - name: StandardByUUID1
+          compare_with: TimeUUIDType
+
+        - name: Super1
+          column_type: Super
+          compare_with: BytesType
+          compare_subcolumns_with: BytesType
+
+        - name: Super2
+          column_type: Super
+          compare_subcolumns_with: UTF8Type
+          preload_row_cache: true
+          rows_cached: 10000
+          keys_cached: 50
+          comment: 'A column family with supercolumns, whose column and 
subcolumn names are UTF8 strings'
+
+        - name: Super3
+          column_type: Super
+          compare_with: LongType
+          comment: 'A column family with supercolumns, whose column names are 
Longs (8 bytes)'

Modified: cassandra/trunk/contrib/word_count/src/WordCountSetup.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCountSetup.java?rev=941797&r1=941796&r2=941797&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/src/WordCountSetup.java (original)
+++ cassandra/trunk/contrib/word_count/src/WordCountSetup.java Thu May  6 
16:42:27 2010
@@ -16,16 +16,19 @@
  * limitations under the License.
  */
 
-import java.util.Arrays;
+import java.util.*;
 
+import org.apache.cassandra.thrift.*;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-
 public class WordCountSetup
 {
     private static final Logger logger = 
LoggerFactory.getLogger(WordCountSetup.class);
@@ -34,47 +37,93 @@ public class WordCountSetup
 
     public static void main(String[] args) throws Exception
     {
-        StorageService.instance.initClient();
-        logger.info("Sleeping " + WordCount.RING_DELAY);
-        Thread.sleep(WordCount.RING_DELAY);
-        assert !StorageService.instance.getLiveNodes().isEmpty();
-
-        RowMutation rm;
-        ColumnFamily cf;
-        byte[] columnName;
+        Cassandra.Iface client = createConnection();
+
+        setupKeyspace(client);
+
+        client.set_keyspace(WordCount.KEYSPACE);
 
-        // text0: no rows
+        Map<byte[], Map<String,List<Mutation>>> mutationMap;
+        Column c;
 
         // text1: 1 row, 1 word
-        columnName = "text1".getBytes();
-        rm = new RowMutation(WordCount.KEYSPACE, "Key0".getBytes());
-        cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
-        cf.addColumn(new Column(columnName, "word1".getBytes(), 0));
-        rm.add(cf);
-        StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
+        c = new Column("text1".getBytes(), "word1".getBytes(), 
System.currentTimeMillis());
+        mutationMap = getMutationMap("key0".getBytes(), 
WordCount.COLUMN_FAMILY, c);
+        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
         logger.info("added text1");
 
-        // text2: 1 row, 2 words
-        columnName = "text2".getBytes();
-        rm = new RowMutation(WordCount.KEYSPACE, "Key0".getBytes());
-        cf = ColumnFamily.create(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
-        cf.addColumn(new Column(columnName, "word1 word2".getBytes(), 0));
-        rm.add(cf);
-        StorageProxy.mutateBlocking(Arrays.asList(rm), ConsistencyLevel.ONE);
+        // text1: 1 row, 2 word
+        c = new Column("text2".getBytes(), "word1 word2".getBytes(), 
System.currentTimeMillis());
+        mutationMap = getMutationMap("key0".getBytes(), 
WordCount.COLUMN_FAMILY, c);
+        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
         logger.info("added text2");
 
         // text3: 1000 rows, 1 word
-        columnName = "text3".getBytes();
-        for (int i = 0; i < 1000; i++)
+        mutationMap = new HashMap<byte[],Map<String,List<Mutation>>>();
+        for (int i=0; i<1000; i++)
         {
-            rm = new RowMutation(WordCount.KEYSPACE, ("Key" + i).getBytes());
-            cf = ColumnFamily.create(WordCount.KEYSPACE, 
WordCount.COLUMN_FAMILY);
-            cf.addColumn(new Column(columnName, "word1".getBytes(), 0));
-            rm.add(cf);
-            StorageProxy.mutateBlocking(Arrays.asList(rm), 
ConsistencyLevel.ONE);
+            c = new Column("text3".getBytes(), "word1".getBytes(), 
System.currentTimeMillis());
+            addToMutationMap(mutationMap, ("key" + i).getBytes(), 
WordCount.COLUMN_FAMILY, c);
         }
+        client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
         logger.info("added text3");
 
         System.exit(0);
     }
+
+    private static Map<byte[],Map<String,List<Mutation>>> 
getMutationMap(byte[] key, String cf, Column c) {
+        Map<byte[],Map<String,List<Mutation>>> mutationMap = new 
HashMap<byte[],Map<String,List<Mutation>>>();
+        addToMutationMap(mutationMap, key, cf, c);
+        return mutationMap;
+    }
+
+    private static void 
addToMutationMap(Map<byte[],Map<String,List<Mutation>>> mutationMap, byte[] 
key, String cf, Column c)
+    {
+        Map<String,List<Mutation>> cfMutation = new 
HashMap<String,List<Mutation>>();
+        List<Mutation> mList = new ArrayList<Mutation>();
+        ColumnOrSuperColumn cc = new ColumnOrSuperColumn();
+        Mutation m = new Mutation();
+
+        cc.setColumn(c);
+        m.setColumn_or_supercolumn(cc);
+        mList.add(m);
+        cfMutation.put(cf, mList);
+        mutationMap.put(key, cfMutation);
+    }
+
+    private static void setupKeyspace(Cassandra.Iface client) throws 
TException, InvalidRequestException
+    {
+        List<CfDef> cfDefList = new ArrayList<CfDef>();
+        CfDef cfDef = new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
+        cfDefList.add(cfDef);
+
+        client.system_add_keyspace(new KsDef(WordCount.KEYSPACE, 
"org.apache.cassandra.locator.RackUnawareStrategy", 1, cfDefList));
+    }
+
+    private static Cassandra.Iface createConnection() throws 
TTransportException
+    {
+        if(System.getProperty("cassandra.host") == null || 
System.getProperty("cassandra.port") == null)
+        {
+           logger.warn("cassandra.host or cassandra.port is not defined, using 
default");
+        }
+        return createConnection( 
System.getProperty("cassandra.host","localhost"),
+                                 
Integer.valueOf(System.getProperty("cassandra.port","9160")),
+                                 
Boolean.valueOf(System.getProperty("cassandra.framed", "false")) );
+    }
+
+    private static Cassandra.Client createConnection(String host, Integer 
port, boolean framed) throws TTransportException
+    {
+        TSocket socket = new TSocket(host, port);
+        TTransport trans;
+
+        if(framed)
+            trans = new TFramedTransport(socket);
+        else
+            trans = socket;
+
+        trans.open();
+        TProtocol protocol = new TBinaryProtocol(trans);
+
+        return new Cassandra.Client(protocol);
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=941797&r1=941796&r2=941797&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 Thu May  6 16:42:27 2010
@@ -24,14 +24,11 @@ package org.apache.cassandra.hadoop;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 
+import org.apache.cassandra.auth.AllowAllAuthenticator;
 import org.apache.cassandra.auth.SimpleAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -112,8 +109,8 @@ public class ColumnFamilyRecordReader ex
         private String startToken;
         private int totalRead = 0;
         private int i = 0;
-        private AbstractType comparator = 
DatabaseDescriptor.getComparator(keyspace, cfName);
-        
+        private AbstractType comparator = null;
+
         private void maybeInit()
         {
             // check if we need another batch 
@@ -151,7 +148,17 @@ public class ColumnFamilyRecordReader ex
             try
             {
                 client.set_keyspace(keyspace);
-               client.login(authRequest);
+                if (!(DatabaseDescriptor.getAuthenticator() instanceof 
AllowAllAuthenticator))
+                {
+                    client.login(authRequest);
+                }
+
+                // Get the keyspace information to get the comparator
+                Map<String, Map<String,String>> desc = 
client.describe_keyspace(keyspace);
+                Map<String,String> ksProps = desc.get(cfName);
+                String compClass = ksProps.get("CompareWith");
+                comparator = (AbstractType) 
Class.forName(compClass).newInstance();
+
                 rows = client.get_range_slices(new ColumnParent(cfName),
                                                predicate,
                                                keyRange,

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=941797&r1=941796&r2=941797&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Thu 
May  6 16:42:27 2010
@@ -23,7 +23,6 @@ package org.apache.cassandra.hadoop;
 
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -59,14 +58,7 @@ public class ConfigHelper
         {
             throw new UnsupportedOperationException("columnfamily may not be 
null");
         }
-        try
-        {
-            ThriftValidation.validateColumnFamily(keyspace, columnFamily);
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException(e);
-        }
+
         conf.set(KEYSPACE_CONFIG, keyspace);
         conf.set(COLUMNFAMILY_CONFIG, columnFamily);
     }


Reply via email to