Dear Wiki user, You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.
The "Roger Mbiama" page has been changed by Roger Mbiama: http://wiki.apache.org/cassandra/Roger%20Mbiama Comment: Angosso New page: Describe Roger Mbiama here. conf/README.txt Required configuration files ============================ cassandra.yaml: main Cassandra configuration file log4j-server.proprties: log4j configuration file for Cassandra server Optional configuration files ============================ access.properties: used for authorization passwd.properties: used for authentication cassandra-rack.properties: used by PropertyFileSnitch # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. calculate_heap_sizes() { case "`uname`" in Linux) system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'` system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo` break ;; FreeBSD) system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'` system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024` system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'` break ;; SunOS) system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'` system_cpu_cores=`psrinfo | wc -l` break ;; *) # assume reasonable defaults for e.g. a modern desktop or # cheap server system_memory_in_mb="2048" system_cpu_cores="2" ;; esac max_heap_size_in_mb=`expr $system_memory_in_mb / 2` MAX_HEAP_SIZE="${max_heap_size_in_mb}M" # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size) max_sensible_yg_per_core_in_mb="100" max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores` desired_yg_in_mb=`expr $max_heap_size_in_mb / 4` if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ] then HEAP_NEWSIZE="${max_sensible_yg_in_mb}M" else HEAP_NEWSIZE="${desired_yg_in_mb}M" fi } # Override these to set the amount of memory to allocate to the JVM at # start-up. For production use you almost certainly want to adjust # this for your environment. MAX_HEAP_SIZE is the total amount of # memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size # of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should # be either set or not (if you set one, set the other). # # The main trade-off for the young generation is that the larger it # is, the longer GC pause times will be. The shorter it is, the more # expensive GC will be (usually). # # The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause # times. If in doubt, and if you do not particularly want to tweak, go with # 100 MB per physical CPU core. #MAX_HEAP_SIZE="4G" #HEAP_NEWSIZE="800M" if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then calculate_heap_sizes else if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)" exit 1 fi fi # Specifies the default port over which Cassandra will be available for # JMX connections. JMX_PORT="7199" # Here we create the arguments that will get passed to the jvm when # starting cassandra. # enable assertions. disabling this in production will give a modest # performance benefit (around 5%). JVM_OPTS="$JVM_OPTS -ea" # add the jamm javaagent check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'` if [ "$check_openjdk" != "OpenJDK" ] then JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar" fi # enable thread priorities, primarily so we can give periodic tasks # a lower priority to avoid interfering with client workload JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities" # allows lowering thread priority without being root. see # http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42" # min and max heap sizes should be set to the same value to avoid # stop-the-world GC pauses during resize, and so that we can lock the # heap in memory on startup to prevent any of it from being swapped # out. JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}" JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}" JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}" JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" if [ "`uname`" = "Linux" ] ; then # reduce the per-thread stack size to minimize the impact of Thrift # thread-per-client. (Best practice is for client connections to # be pooled anyway.) Only do so on Linux where it is known to be # supported. JVM_OPTS="$JVM_OPTS -Xss128k" fi # GC tuning options JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1" JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75" JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly" # GC logging options -- uncomment to enable # JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails" # JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps" # JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram" # JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution" # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime" # JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log" # uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414 # JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414" # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: # comment out this entry to enable IPv6 support). JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true" # jmx: metrics and administration interface # # add this if you're having trouble connecting: # JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>" # # see # http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole # for more on configuring JMX through firewalls, etc. (Short version: # get it working with no firewall first.) JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /** * Cassandra has a back door called the Binary Memtable. The purpose of this backdoor is to * mass import large amounts of data, without using the Thrift interface. * * Inserting data through the binary memtable, allows you to skip the commit log overhead, and an ack * from Thrift on every insert. The example below utilizes Hadoop to generate all the data necessary * to send to Cassandra, and sends it using the Binary Memtable interface. What Hadoop ends up doing is * creating the actual data that gets put into an SSTable as if you were using Thrift. With enough Hadoop nodes * inserting the data, the bottleneck at this point should become the network. * * We recommend adjusting the compaction threshold to 0, while the import is running. After the import, you need * to run `nodeprobe -host <IP> flush_binary <Keyspace>` on every node, as this will flush the remaining data still left * in memory to disk. Then it's recommended to adjust the compaction threshold to it's original value. * * The example below is a sample Hadoop job, and it inserts SuperColumns. It can be tweaked to work with normal Columns. * * You should construct your data you want to import as rows delimited by a new line. You end up grouping by <Key> * in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes. * * For Cassandra 0.6.4, we modified this example to wait for acks from all Cassandra nodes for each row * before proceeding to the next. This means to keep Cassandra similarly busy you can either * 1) add more reducer tasks, * 2) remove the "wait for acks" block of code, * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor. * * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE. */ package org.apache.cassandra.bulkloader; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.clock.TimestampReconciler; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.BigIntegerToken; import org.apache.cassandra.io.util.DataOutputBuffer; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; public class CassandraBulkLoader { public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> { private Text word = new Text(); public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // This is a simple key/value mapper. output.collect(key, value); } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { private Path[] localFiles; private ArrayList<String> tokens = new ArrayList<String>(); private JobConf jobconf; public void configure(JobConf job) { this.jobconf = job; String cassConfig; // Get the cached files try { localFiles = DistributedCache.getLocalCacheFiles(job); } catch (IOException e) { throw new RuntimeException(e); } cassConfig = localFiles[0].getParent().toString(); System.setProperty("storage-config",cassConfig); try { StorageService.instance.initClient(); } catch (IOException e) { throw new RuntimeException(e); } try { Thread.sleep(10*1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } public void close() { try { // release the cache DistributedCache.releaseCache(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), this.jobconf); } catch (IOException e) { throw new RuntimeException(e); } catch (URISyntaxException e) { throw new RuntimeException(e); } try { // Sleep just in case the number of keys we send over is small Thread.sleep(3*1000); } catch (InterruptedException e) { throw new RuntimeException(e); } StorageService.instance.stopClient(); } public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { ColumnFamily columnFamily; String keyspace = "Keyspace1"; String cfName = "Super1"; Message message; List<ColumnFamily> columnFamilies; columnFamilies = new LinkedList<ColumnFamily>(); String line; /* Create a column family */ columnFamily = ColumnFamily.create(keyspace, cfName); while (values.hasNext()) { // Split the value (line based on your own delimiter) line = values.next().toString(); String[] fields = line.split("\1"); String SuperColumnName = fields[1]; String ColumnName = fields[2]; String ColumnValue = fields[3]; int timestamp = 0; columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), new TimestampClock(timestamp)); } columnFamilies.add(columnFamily); /* Get serialized message to send to cluster */ message = createMessage(keyspace, key.getBytes(), cfName, columnFamilies); List<IAsyncResult> results = new ArrayList<IAsyncResult>(); for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace, key.getBytes())) { /* Send message to end point */ results.add(MessagingService.instance.sendRR(message, endpoint)); } /* wait for acks */ for (IAsyncResult result : results) { try { result.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // you should probably add retry logic here throw new RuntimeException(e); } } output.collect(key, new Text(" inserted into Cassandra node(s)")); } } public static void runJob(String[] args) { JobConf conf = new JobConf(CassandraBulkLoader.class); if(args.length >= 4) { conf.setNumReduceTasks(new Integer(args[3])); } try { // We store the cassandra storage-conf.xml on the HDFS cluster DistributedCache.addCacheFile(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), conf); } catch (URISyntaxException e) { throw new RuntimeException(e); } conf.setInputFormat(KeyValueTextInputFormat.class); conf.setJobName("CassandraBulkLoader_v2"); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, new Path(args[1])); FileOutputFormat.setOutputPath(conf, new Path(args[2])); try { JobClient.runJob(conf); } catch (IOException e) { throw new RuntimeException(e); } } public static Message createMessage(String Keyspace, byte[] Key, String CFName, List<ColumnFamily> ColumnFamiles) { ColumnFamily baseColumnFamily; DataOutputBuffer bufOut = new DataOutputBuffer(); RowMutation rm; Message message; Column column; /* Get the first column family from list, this is just to get past validation */ baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard, ClockType.Timestamp, DatabaseDescriptor.getComparator(Keyspace, CFName), DatabaseDescriptor.getSubComparator(Keyspace, CFName), TimestampReconciler.instance, CFMetaData.getId(Keyspace, CFName)); for(ColumnFamily cf : ColumnFamiles) { bufOut.reset(); ColumnFamily.serializer().serializeWithIndexes(cf, bufOut); byte[] data = new byte[bufOut.getLength()]; System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength()); column = new Column(FBUtilities.toByteArray(cf.id()), data, new TimestampClock(0)); baseColumnFamily.addColumn(column); } rm = new RowMutation(Keyspace, Key); rm.add(baseColumnFamily); try { /* Make message */ message = rm.makeRowMutationMessage(StorageService.Verb.BINARY); } catch (IOException e) { throw new RuntimeException(e); } return message; } public static void main(String[] args) throws Exception { runJob(args); } } #!/bin/bash # # /etc/init.d/cassandra # # Startup script for Cassandra # # chkconfig: 2345 20 80 # description: Starts and stops Cassandra . /etc/rc.d/init.d/functions export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/ export CASSANDRA_HOME=/usr/share/cassandra/ export CASSANDRA_INCLUDE=/usr/share/cassandra/cassandra.in.sh export CASSANDRA_CONF=/etc/cassandra/conf export CASSANDRA_OWNR=cassandra log_file=/var/log/cassandra/cassandra.log pid_file=/var/run/cassandra/cassandra.pid CASSANDRA_PROG=/usr/sbin/cassandra case "$1" in start) # Cassandra startup echo -n "Starting Cassandra: " su $CASSANDRA_OWNR -c "$CASSANDRA_PROG -p $pid_file" > $log_file 2>&1 echo "OK" ;; stop) # Cassandra shutdown echo -n "Shutdown Cassandra: " su $CASSANDRA_OWNR -c "kill `cat $pid_file`" echo "OK" ;; reload|restart) $0 stop $0 start ;; status) ;; *) echo "Usage: `basename $0` start|stop|restart|reload" exit 1 esac exit 0