Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-26 Thread Patrik Modesto
On Wed, Jan 26, 2011 at 08:58, Mck m...@apache.org wrote:
 You are correct that microseconds would be better but for the test it
 doesn't matter that much.

 Have you tried. I'm very new to cassandra as well, and always uncertain
 as to what to expect...

IMHO it's matter of use-case. In my use-case there is no possibility
for two (or more) processes to write/update the same key so
miliseconds are fine for me.

BTW how to get current time in microseconds in Java?

 As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..)
 won't this hurt performance? Normally i would _always_ agree that a
 defensive copy of an array/collection argument be stored, but has this
 intentionally not been done (or should it) because of large reduce jobs
 (millions of records) and the performance impact here.

The size of the queue is computed at runtime:
ColumnFamilyOutputFormat.QUEUE_SIZE, 32 *
Runtime.getRuntime().availableProcessors()
So the queue is not too large so I'd say the performance shouldn't get hurt.

 --
Patrik


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-26 Thread Mck
On Wed, 2011-01-26 at 12:13 +0100, Patrik Modesto wrote:
 BTW how to get current time in microseconds in Java?

I'm using HFactory.clock() (from hector).

  As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..)
  won't this hurt performance? 
 
 The size of the queue is computed at runtime:
 ColumnFamilyOutputFormat.QUEUE_SIZE, 32 *
 Runtime.getRuntime().availableProcessors()
 So the queue is not too large so I'd say the performance shouldn't get hurt. 

This is only the default.
I'm running w/ 8. Testing have given this the best throughput for me
when processing 25+ million rows...

In the end it is still 25+ million .clone(..) calls. 

 The key isn't the only potential live byte[]. You also have names and
 values in all the columns (and supercolumns) for all the mutations.

Now make that over a billion .clone(..) calls... :-(

byte[] copies are relatively quick and cheap, still i am seeing a
performance degradation in m/r reduce performance with cloning of keys.
It's not that you don't have my vote here, i'm just stating my
uncertainty on what the correct API should be.

~mck


signature.asc
Description: This is a digitally signed message part


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-26 Thread Jonathan Ellis
On Tue, Jan 25, 2011 at 12:09 PM, Mick Semb Wever m...@apache.org wrote:
 Well your key is a mutable Text object, so i can see some possibility
 depending on how hadoop uses these objects.

Yes, that's it exactly.  We recently fixed a bug in the demo
word_count program for this. Now we do
ByteBuffer.wrap(Arrays.copyOf(text.getBytes(), text.getLength())).

-- 
Jonathan Ellis
Project Chair, Apache Cassandra
co-founder of DataStax, the source for professional Cassandra support
http://www.datastax.com


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-25 Thread Mick Semb Wever
On Tue, 2011-01-25 at 09:37 +0100, Patrik Modesto wrote:
 While developing really simple MR task, I've found that a
 combiantion of Hadoop optimalization and Cassandra
 ColumnFamilyRecordWriter queue creates wrong keys to send to
 batch_mutate(). 

I've seen similar behaviour (junk rows being written), although my keys
are always a result from
  LongSerializer.get().toByteBuffer(key)


i'm interested in looking into it - but can you provide a code example? 

  From what i can see TextOutputFormat.LineRecordWriter.write(..)
doesn't clone anything, but it does write it out immediately.
  While ColumnFamilyRecordWriter does batch the mutations up as you say,
it takes a ByteBuffer as a key, why/how are you re-using this
client-side (arn't you creating a new ByteBuffer each call to
write(..))?

~mck

-- 
Never let your sense of morals get in the way of doing what's right.
Isaac Asimov 
| http://semb.wever.org | http://sesat.no
| http://finn.no   | Java XSS Filter



signature.asc
Description: This is a digitally signed message part


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-25 Thread Patrik Modesto
Hi Mick,

attached is the very simple MR job, that deletes expired URL from my
test Cassandra DB. The keyspace looks like this:

Keyspace: Test:
  Replication Strategy: org.apache.cassandra.locator.SimpleStrategy
Replication Factor: 2
  Column Families:
ColumnFamily: Url2
  Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type
  Row cache size / save period: 0.0/0
  Key cache size / save period: 20.0/3600
  Memtable thresholds: 4.7015625/1003/60
  GC grace seconds: 864000
  Compaction min/max thresholds: 4/32
  Read repair chance: 1.0
  Built indexes: []

In the CF the key is URL and inside there are some data. My MR job
needs just expire_date which is int64 timestamp. For now I store it
as a string because I use Python and C++ to manipulate the data as
well.

For the MR Job to run you need a patch I did. You can find it here:
https://issues.apache.org/jira/browse/CASSANDRA-2014

The atttached file contains the working version with cloned key in
reduce() method. My other aproache was:
[code]
context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()),
Collections.singletonList(getMutation(key)));
[/code]
Which produce junk keys.

Best regards,
Patrik

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
import java.util.*;

import org.apache.cassandra.avro.Mutation;
import org.apache.cassandra.avro.Deletion;
import org.apache.cassandra.avro.SliceRange;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ContextExpirator extends Configured implements Tool
{
static final String KEYSPACE = Test;
static final String COLUMN_FAMILY = Url2;
static final String OUTPUT_COLUMN_FAMILY = Url2;
static final String COLUMN_VALUE = expire_date;

public static void main(String[] args) throws Exception
{
// Let ToolRunner handle generic command-line options
ToolRunner.run(new Configuration(), new ContextExpirator(), args);
System.exit(0);
}

public static class UrlFilterMapper
extends MapperByteBuffer, SortedMapByteBuffer, IColumn, Text, NullWritable
{
private final static NullWritable nic = NullWritable.get();
private ByteBuffer sourceColumn;
private static long now;

protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException
{
sourceColumn = ByteBuffer.wrap(COLUMN_VALUE.getBytes());
now = System.currentTimeMillis() / 1000; // convert from ms
}

public void map(ByteBuffer key, SortedMapByteBuffer, IColumn columns, Context context)
throws IOException, InterruptedException
{
IColumn column = columns.get(sourceColumn);
if (column == null) {
return;
}

Text tKey = new Text(ByteBufferUtil.string(key));
Long value = Long.decode(ByteBufferUtil.string(column.value()));

if(now  value) {
context.write(tKey, nic);
}
}
}

public static class RemoveUrlReducer
extends ReducerText, NullWritable, ByteBuffer, ListMutation
{
public void reduce(Text key, IterableNullWritable values, Context context)
throws IOException, InterruptedException
{
ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()));
context.write(bbKey, Collections.singletonList(getMutation()));
}

private static Mutation getMutation()
{
Deletion d = new Deletion();
d.timestamp = System.currentTimeMillis();

Mutation m = new Mutation();
m.deletion = d;

return m;
}
}

public int run(String[] args) throws Exception
{
Job job = new Job(getConf(), context_expitator);
job.setJarByClass(ContextExpirator.class);

job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);

job.setMapperClass(UrlFilterMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-25 Thread Mick Semb Wever
On Tue, 2011-01-25 at 14:16 +0100, Patrik Modesto wrote:
 The atttached file contains the working version with cloned key in
 reduce() method. My other aproache was:
 
  context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()),
  Collections.singletonList(getMutation(key)));
 
 Which produce junk keys. 

In fact i have another problem (trying to write an empty byte[], or
something, as a key, which put one whole row out of whack, ((one row in
25 million...))).

But i'm debugging along the same code.

I don't quite understand how the byte[] in 
ByteBuffer.wrap(key.getBytes(),...)
gets clobbered.
Well your key is a mutable Text object, so i can see some possibility
depending on how hadoop uses these objects.
Is there something to ByteBuffer.allocate(..) i'm missing...

btw.
 is d.timestamp = System.currentTimeMillis(); ok?
 shouldn't this be microseconds so that each mutation has a different
timestamp? http://wiki.apache.org/cassandra/DataModel


~mck


-- 
As you go the way of life, you will see a great chasm. Jump. It is not
as wide as you think. Native American Initiation Rite 
| http://semb.wever.org | http://sesat.no
| http://finn.no   | Java XSS Filter

-- 
Everything should be made as simple as possible, but not simpler.
Albert Einstein (William of Ockham) 
| http://semb.wever.org | http://sesat.no
| http://finn.no   | Java XSS Filter


signature.asc
Description: This is a digitally signed message part


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-25 Thread Patrik Modesto
On Tue, Jan 25, 2011 at 19:09, Mick Semb Wever m...@apache.org wrote:

 In fact i have another problem (trying to write an empty byte[], or
 something, as a key, which put one whole row out of whack, ((one row in
 25 million...))).

 But i'm debugging along the same code.

 I don't quite understand how the byte[] in
 ByteBuffer.wrap(key.getBytes(),...)
 gets clobbered.

Code snippet would help here.

 Well your key is a mutable Text object, so i can see some possibility
 depending on how hadoop uses these objects.
 Is there something to ByteBuffer.allocate(..) i'm missing...

I don't know, I'm quite new to Java (but with long C++ history).

 btw.
  is d.timestamp = System.currentTimeMillis(); ok?
  shouldn't this be microseconds so that each mutation has a different
 timestamp? http://wiki.apache.org/cassandra/DataModel

You are correct that microseconds would be better but for the test it
doesn't matter that much.

Patrik


Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse

2011-01-25 Thread Mck

   is d.timestamp = System.currentTimeMillis(); ok?
 
 You are correct that microseconds would be better but for the test it
 doesn't matter that much. 

Have you tried. I'm very new to cassandra as well, and always uncertain
as to what to expect...


 ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, 
 key.getLength())); 

An alternative approach to your client-side cloning is 

  ByteBuffer bbKey = ByteBuffer.wrap(key.toString().getBytes(UTF_8)); 

Here at least it is obvious you are passing in the bytes from an immutable 
object.

As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..)
won't this hurt performance? Normally i would _always_ agree that a
defensive copy of an array/collection argument be stored, but has this
intentionally not been done (or should it) because of large reduce jobs
(millions of records) and the performance impact here.

The key isn't the only potential live byte[]. You also have names and
values in all the columns (and supercolumns) for all the mutations.


~mck