Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for 
change notification.

The "Roger Mbiama" page has been changed by Roger Mbiama:
http://wiki.apache.org/cassandra/Roger%20Mbiama

Comment:
Angosso

New page:
Describe Roger Mbiama here.

conf/README.txt
Required configuration files
============================

cassandra.yaml: main Cassandra configuration file
log4j-server.proprties: log4j configuration file for Cassandra server


Optional configuration files
============================

access.properties: used for authorization
passwd.properties: used for authentication
cassandra-rack.properties: used by PropertyFileSnitch

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

calculate_heap_sizes()
{
    case "`uname`" in
        Linux)
            system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'`
            system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' 
/proc/cpuinfo`
            break
        ;;
        FreeBSD)
            system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
            system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024`
            system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
            break
        ;;
        SunOS)
            system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'`
            system_cpu_cores=`psrinfo | wc -l`
            break
        ;;
        *)
            # assume reasonable defaults for e.g. a modern desktop or
            # cheap server
            system_memory_in_mb="2048"
            system_cpu_cores="2"
        ;;
    esac
    max_heap_size_in_mb=`expr $system_memory_in_mb / 2`
    MAX_HEAP_SIZE="${max_heap_size_in_mb}M"

    # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap 
size)
    max_sensible_yg_per_core_in_mb="100"
    max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" 
$system_cpu_cores`

    desired_yg_in_mb=`expr $max_heap_size_in_mb / 4`

    if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ]
    then
        HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
    else
        HEAP_NEWSIZE="${desired_yg_in_mb}M"
    fi
}

# Override these to set the amount of memory to allocate to the JVM at
# start-up. For production use you almost certainly want to adjust
# this for your environment. MAX_HEAP_SIZE is the total amount of
# memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size
# of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should
# be either set or not (if you set one, set the other).
#
# The main trade-off for the young generation is that the larger it
# is, the longer GC pause times will be. The shorter it is, the more
# expensive GC will be (usually).
#
# The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause
# times. If in doubt, and if you do not particularly want to tweak, go with
# 100 MB per physical CPU core.

#MAX_HEAP_SIZE="4G"
#HEAP_NEWSIZE="800M"

if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then
    calculate_heap_sizes
else
    if [ "x$MAX_HEAP_SIZE" = "x" ] ||  [ "x$HEAP_NEWSIZE" = "x" ]; then
        echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see 
cassandra-env.sh)"
        exit 1
    fi
fi

# Specifies the default port over which Cassandra will be available for
# JMX connections.
JMX_PORT="7199"


# Here we create the arguments that will get passed to the jvm when
# starting cassandra.

# enable assertions.  disabling this in production will give a modest
# performance benefit (around 5%).
JVM_OPTS="$JVM_OPTS -ea"

# add the jamm javaagent
check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'`
if [ "$check_openjdk" != "OpenJDK" ]
then
    JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar"
fi

# enable thread priorities, primarily so we can give periodic tasks
# a lower priority to avoid interfering with client workload
JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities"
# allows lowering thread priority without being root.  see
# http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html
JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42"

# min and max heap sizes should be set to the same value to avoid
# stop-the-world GC pauses during resize, and so that we can lock the
# heap in memory on startup to prevent any of it from being swapped
# out.
JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}"
JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}"
JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}"
JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" 

if [ "`uname`" = "Linux" ] ; then
    # reduce the per-thread stack size to minimize the impact of Thrift
    # thread-per-client.  (Best practice is for client connections to
    # be pooled anyway.) Only do so on Linux where it is known to be
    # supported.
    JVM_OPTS="$JVM_OPTS -Xss128k"
fi

# GC tuning options
JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" 
JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" 
JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" 
JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" 
JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1"
JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75"
JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"

# GC logging options -- uncomment to enable
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails"
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps"
# JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram"
# JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log"

# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 
1414
# JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent 
-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414"

# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See 
# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
# comment out this entry to enable IPv6 support).
JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true"

# jmx: metrics and administration interface
# 
# add this if you're having trouble connecting:
# JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>"
# 
# see 
# 
http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole
# for more on configuring JMX through firewalls, etc. (Short version:
# get it working with no firewall first.)
JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" 
JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" 
JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" 


2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
 /**
  * Cassandra has a back door called the Binary Memtable. The purpose of this 
backdoor is to
  * mass import large amounts of data, without using the Thrift interface.
  *
  * Inserting data through the binary memtable, allows you to skip the commit 
log overhead, and an ack
  * from Thrift on every insert. The example below utilizes Hadoop to generate 
all the data necessary 
  * to send to Cassandra, and sends it using the Binary Memtable interface. 
What Hadoop ends up doing is
  * creating the actual data that gets put into an SSTable as if you were using 
Thrift. With enough Hadoop nodes
  * inserting the data, the bottleneck at this point should become the network.
  * 
  * We recommend adjusting the compaction threshold to 0, while the import is 
running. After the import, you need
  * to run `nodeprobe -host <IP> flush_binary <Keyspace>` on every node, as 
this will flush the remaining data still left 
  * in memory to disk. Then it's recommended to adjust the compaction threshold 
to it's original value.
  * 
  * The example below is a sample Hadoop job, and it inserts SuperColumns. It 
can be tweaked to work with normal Columns.
  *
  * You should construct your data you want to import as rows delimited by a 
new line. You end up grouping by <Key>
  * in the mapper, so that the end result generates the data set into a column 
oriented subset. Once you get to the
  * reduce aspect, you can generate the ColumnFamilies you want inserted, and 
send it to your nodes.
  *
  * For Cassandra 0.6.4, we modified this example to wait for acks from all 
Cassandra nodes for each row
  * before proceeding to the next.  This means to keep Cassandra similarly busy 
you can either
  * 1) add more reducer tasks,
  * 2) remove the "wait for acks" block of code,
  * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
  *
  * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
  */
  
package org.apache.cassandra.bulkloader;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.BigIntegerToken;
import org.apache.cassandra.io.util.DataOutputBuffer;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class CassandraBulkLoader {
    public static class Map extends MapReduceBase implements Mapper<Text, Text, 
Text, Text> {
        private Text word = new Text();

        public void map(Text key, Text value, OutputCollector<Text, Text> 
output, Reporter reporter) throws IOException {
            // This is a simple key/value mapper.
            output.collect(key, value);
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, 
Text, Text, Text> {
        private Path[] localFiles;
        private ArrayList<String> tokens = new ArrayList<String>();
        private JobConf jobconf;

        public void configure(JobConf job) {
            this.jobconf = job;
            String cassConfig;

            // Get the cached files
            try
            {
                localFiles = DistributedCache.getLocalCacheFiles(job);
            }
            catch (IOException e)
            {
                throw new RuntimeException(e);
            }
            cassConfig = localFiles[0].getParent().toString();

            System.setProperty("storage-config",cassConfig);

            try
            {
                StorageService.instance.initClient();
            }
            catch (IOException e)
            {
                throw new RuntimeException(e);
            }
            try
            {
                Thread.sleep(10*1000);
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
        }

        public void close()
        {
            try
            {
                // release the cache
                DistributedCache.releaseCache(new 
URI("/cassandra/storage-conf.xml#storage-conf.xml"), this.jobconf);
            }
            catch (IOException e)
            {
                throw new RuntimeException(e);
            }
            catch (URISyntaxException e)
            {
                throw new RuntimeException(e);
            }
            try
            {
                // Sleep just in case the number of keys we send over is small
                Thread.sleep(3*1000);
            }
            catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }
            StorageService.instance.stopClient();
        }

        public void reduce(Text key, Iterator<Text> values, 
OutputCollector<Text, Text> output, Reporter reporter) throws IOException
        {
            ColumnFamily columnFamily;
            String keyspace = "Keyspace1";
            String cfName = "Super1";
            Message message;
            List<ColumnFamily> columnFamilies;
            columnFamilies = new LinkedList<ColumnFamily>();
            String line;

            /* Create a column family */
            columnFamily = ColumnFamily.create(keyspace, cfName);
            while (values.hasNext()) {
                // Split the value (line based on your own delimiter)
                line = values.next().toString();
                String[] fields = line.split("\1");
                String SuperColumnName = fields[1];
                String ColumnName = fields[2];
                String ColumnValue = fields[3];
                int timestamp = 0;
                columnFamily.addColumn(new QueryPath(cfName, 
SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), 
ColumnValue.getBytes(), new TimestampClock(timestamp));
            }

            columnFamilies.add(columnFamily);

            /* Get serialized message to send to cluster */
            message = createMessage(keyspace, key.getBytes(), cfName, 
columnFamilies);
            List<IAsyncResult> results = new ArrayList<IAsyncResult>();
            for (InetAddress endpoint: 
StorageService.instance.getNaturalEndpoints(keyspace, key.getBytes()))
            {
                /* Send message to end point */
                results.add(MessagingService.instance.sendRR(message, 
endpoint));
            }
            /* wait for acks */
            for (IAsyncResult result : results)
            {
                try
                {
                    result.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e)
                {
                    // you should probably add retry logic here
                    throw new RuntimeException(e);
                }
            }
            
            output.collect(key, new Text(" inserted into Cassandra node(s)"));
        }
    }

    public static void runJob(String[] args)
    {
        JobConf conf = new JobConf(CassandraBulkLoader.class);

        if(args.length >= 4)
        {
          conf.setNumReduceTasks(new Integer(args[3]));
        }

        try
        {
            // We store the cassandra storage-conf.xml on the HDFS cluster
            DistributedCache.addCacheFile(new 
URI("/cassandra/storage-conf.xml#storage-conf.xml"), conf);
        }
        catch (URISyntaxException e)
        {
            throw new RuntimeException(e);
        }
        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setJobName("CassandraBulkLoader_v2");
        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(conf, new Path(args[1]));
        FileOutputFormat.setOutputPath(conf, new Path(args[2]));
        try
        {
            JobClient.runJob(conf);
        }
        catch (IOException e)
        {
            throw new RuntimeException(e);
        }
    }

    public static Message createMessage(String Keyspace, byte[] Key, String 
CFName, List<ColumnFamily> ColumnFamiles)
    {
        ColumnFamily baseColumnFamily;
        DataOutputBuffer bufOut = new DataOutputBuffer();
        RowMutation rm;
        Message message;
        Column column;

        /* Get the first column family from list, this is just to get past 
validation */
        baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard,
                                            ClockType.Timestamp,
                                            
DatabaseDescriptor.getComparator(Keyspace, CFName),
                                            
DatabaseDescriptor.getSubComparator(Keyspace, CFName),
                                            TimestampReconciler.instance,
                                            CFMetaData.getId(Keyspace, CFName));
        
        for(ColumnFamily cf : ColumnFamiles) {
            bufOut.reset();
            ColumnFamily.serializer().serializeWithIndexes(cf, bufOut);
            byte[] data = new byte[bufOut.getLength()];
            System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());

            column = new Column(FBUtilities.toByteArray(cf.id()), data, new 
TimestampClock(0));
            baseColumnFamily.addColumn(column);
        }
        rm = new RowMutation(Keyspace, Key);
        rm.add(baseColumnFamily);

        try
        {
            /* Make message */
            message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
        }
        catch (IOException e)
        {
            throw new RuntimeException(e);
        }

        return message;
    }
    public static void main(String[] args) throws Exception
    {
        runJob(args);
    }
}

#!/bin/bash
#
# /etc/init.d/cassandra
#
# Startup script for Cassandra
# 
# chkconfig: 2345 20 80
# description: Starts and stops Cassandra

. /etc/rc.d/init.d/functions

export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/
export CASSANDRA_HOME=/usr/share/cassandra/
export CASSANDRA_INCLUDE=/usr/share/cassandra/cassandra.in.sh
export CASSANDRA_CONF=/etc/cassandra/conf
export CASSANDRA_OWNR=cassandra
log_file=/var/log/cassandra/cassandra.log
pid_file=/var/run/cassandra/cassandra.pid
CASSANDRA_PROG=/usr/sbin/cassandra


case "$1" in
    start)
        # Cassandra startup
        echo -n "Starting Cassandra: "
        su $CASSANDRA_OWNR -c "$CASSANDRA_PROG -p $pid_file" > $log_file 2>&1
        echo "OK"
        ;;
    stop)
        # Cassandra shutdown
        echo -n "Shutdown Cassandra: "
        su $CASSANDRA_OWNR -c "kill `cat $pid_file`"
        echo "OK"
        ;;
    reload|restart)
        $0 stop
        $0 start
        ;;
    status)
        ;;
    *)
        echo "Usage: `basename $0` start|stop|restart|reload"
        exit 1
esac

exit 0

Reply via email to