Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
On Tue, Jan 25, 2011 at 12:09 PM, Mick Semb Wever wrote: > Well your key is a mutable Text object, so i can see some possibility > depending on how hadoop uses these objects. Yes, that's it exactly. We recently fixed a bug in the demo word_count program for this. Now we do ByteBuffer.wrap(Arrays.copyOf(text.getBytes(), text.getLength())). -- Jonathan Ellis Project Chair, Apache Cassandra co-founder of DataStax, the source for professional Cassandra support http://www.datastax.com
Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
On Wed, 2011-01-26 at 12:13 +0100, Patrik Modesto wrote: > BTW how to get current time in microseconds in Java? I'm using HFactory.clock() (from hector). > > As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..) > > won't this hurt performance? > > The size of the queue is computed at runtime: > ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * > Runtime.getRuntime().availableProcessors() > So the queue is not too large so I'd say the performance shouldn't get hurt. This is only the default. I'm running w/ 8. Testing have given this the best throughput for me when processing 25+ million rows... In the end it is still 25+ million .clone(..) calls. > The key isn't the only potential live byte[]. You also have names and > values in all the columns (and supercolumns) for all the mutations. Now make that over a billion .clone(..) calls... :-( byte[] copies are relatively quick and cheap, still i am seeing a performance degradation in m/r reduce performance with cloning of keys. It's not that you don't have my vote here, i'm just stating my uncertainty on what the correct API should be. ~mck signature.asc Description: This is a digitally signed message part
Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
On Wed, Jan 26, 2011 at 08:58, Mck wrote: >> You are correct that microseconds would be better but for the test it >> doesn't matter that much. > > Have you tried. I'm very new to cassandra as well, and always uncertain > as to what to expect... IMHO it's matter of use-case. In my use-case there is no possibility for two (or more) processes to write/update the same key so miliseconds are fine for me. BTW how to get current time in microseconds in Java? > As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..) > won't this hurt performance? Normally i would _always_ agree that a > defensive copy of an array/collection argument be stored, but has this > intentionally not been done (or should it) because of large reduce jobs > (millions of records) and the performance impact here. The size of the queue is computed at runtime: ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors() So the queue is not too large so I'd say the performance shouldn't get hurt. -- Patrik
Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
> > is "d.timestamp = System.currentTimeMillis();" ok? > > You are correct that microseconds would be better but for the test it > doesn't matter that much. Have you tried. I'm very new to cassandra as well, and always uncertain as to what to expect... > ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, > key.getLength())); An alternative approach to your client-side cloning is ByteBuffer bbKey = ByteBuffer.wrap(key.toString().getBytes(UTF_8)); Here at least it is obvious you are passing in the bytes from an immutable object. As far as moving the clone(..) into ColumnFamilyRecordWriter.write(..) won't this hurt performance? Normally i would _always_ agree that a defensive copy of an array/collection argument be stored, but has this intentionally not been done (or should it) because of large reduce jobs (millions of records) and the performance impact here. The key isn't the only potential live byte[]. You also have names and values in all the columns (and supercolumns) for all the mutations. ~mck
Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
On Tue, Jan 25, 2011 at 19:09, Mick Semb Wever wrote: > In fact i have another problem (trying to write an empty byte[], or > something, as a key, which put one whole row out of whack, ((one row in > 25 million...))). > > But i'm debugging along the same code. > > I don't quite understand how the byte[] in > ByteBuffer.wrap(key.getBytes(),...) > gets clobbered. Code snippet would help here. > Well your key is a mutable Text object, so i can see some possibility > depending on how hadoop uses these objects. > Is there something to ByteBuffer.allocate(..) i'm missing... I don't know, I'm quite new to Java (but with long C++ history). > btw. > is "d.timestamp = System.currentTimeMillis();" ok? > shouldn't this be microseconds so that each mutation has a different > timestamp? http://wiki.apache.org/cassandra/DataModel You are correct that microseconds would be better but for the test it doesn't matter that much. Patrik
Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
On Tue, 2011-01-25 at 14:16 +0100, Patrik Modesto wrote: > The atttached file contains the working version with cloned key in > reduce() method. My other aproache was: > > > context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()), > > Collections.singletonList(getMutation(key))); > > Which produce junk keys. In fact i have another problem (trying to write an empty byte[], or something, as a key, which put one whole row out of whack, ((one row in 25 million...))). But i'm debugging along the same code. I don't quite understand how the byte[] in ByteBuffer.wrap(key.getBytes(),...) gets clobbered. Well your key is a mutable Text object, so i can see some possibility depending on how hadoop uses these objects. Is there something to ByteBuffer.allocate(..) i'm missing... btw. is "d.timestamp = System.currentTimeMillis();" ok? shouldn't this be microseconds so that each mutation has a different timestamp? http://wiki.apache.org/cassandra/DataModel ~mck -- "As you go the way of life, you will see a great chasm. Jump. It is not as wide as you think." Native American Initiation Rite | http://semb.wever.org | http://sesat.no | http://finn.no | Java XSS Filter -- "Everything should be made as simple as possible, but not simpler." Albert Einstein (William of Ockham) | http://semb.wever.org | http://sesat.no | http://finn.no | Java XSS Filter signature.asc Description: This is a digitally signed message part
Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
Hi Mick, attached is the very simple MR job, that deletes expired URL from my test Cassandra DB. The keyspace looks like this: Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.SimpleStrategy Replication Factor: 2 Column Families: ColumnFamily: Url2 Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type Row cache size / save period: 0.0/0 Key cache size / save period: 20.0/3600 Memtable thresholds: 4.7015625/1003/60 GC grace seconds: 864000 Compaction min/max thresholds: 4/32 Read repair chance: 1.0 Built indexes: [] In the CF the key is URL and inside there are some data. My MR job needs just "expire_date" which is int64 timestamp. For now I store it as a string because I use Python and C++ to manipulate the data as well. For the MR Job to run you need a patch I did. You can find it here: https://issues.apache.org/jira/browse/CASSANDRA-2014 The atttached file contains the working version with cloned key in reduce() method. My other aproache was: [code] context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()), Collections.singletonList(getMutation(key))); [/code] Which produce junk keys. Best regards, Patrik import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.LongBuffer; import java.util.*; import org.apache.cassandra.avro.Mutation; import org.apache.cassandra.avro.Deletion; import org.apache.cassandra.avro.SliceRange; import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ContextExpirator extends Configured implements Tool { static final String KEYSPACE = "Test"; static final String COLUMN_FAMILY = "Url2"; static final String OUTPUT_COLUMN_FAMILY = "Url2"; static final String COLUMN_VALUE = "expire_date"; public static void main(String[] args) throws Exception { // Let ToolRunner handle generic command-line options ToolRunner.run(new Configuration(), new ContextExpirator(), args); System.exit(0); } public static class UrlFilterMapper extends Mapper, Text, NullWritable> { private final static NullWritable nic = NullWritable.get(); private ByteBuffer sourceColumn; private static long now; protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { sourceColumn = ByteBuffer.wrap(COLUMN_VALUE.getBytes()); now = System.currentTimeMillis() / 1000; // convert from ms } public void map(ByteBuffer key, SortedMap columns, Context context) throws IOException, InterruptedException { IColumn column = columns.get(sourceColumn); if (column == null) { return; } Text tKey = new Text(ByteBufferUtil.string(key)); Long value = Long.decode(ByteBufferUtil.string(column.value())); if(now > value) { context.write(tKey, nic); } } } public static class RemoveUrlReducer extends Reducer> { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, key.getLength())); context.write(bbKey, Collections.singletonList(getMutation())); } private static Mutation getMutation() { Deletion d = new Deletion(); d.timestamp = System.currentTimeMillis(); Mutation m = new Mutation(); m.deletion = d; return m; } } public int run(String[] args) throws Exception { Job job = new Job(getConf(), "context_expitator"); job.setJarByClass(ContextExpirator.class); job.setInputFormatClass(ColumnFamilyInputFormat.class); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); job.setMapperClass(UrlFilterMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(RemoveUrlReducer.class); job.setOutputKeyClass(ByteBuffer.class); job.setOutputValueClass(List.cla
Re: [mapreduce] ColumnFamilyRecordWriter hidden reuse
On Tue, 2011-01-25 at 09:37 +0100, Patrik Modesto wrote: > While developing really simple MR task, I've found that a > combiantion of Hadoop optimalization and Cassandra > ColumnFamilyRecordWriter queue creates wrong keys to send to > batch_mutate(). I've seen similar behaviour (junk rows being written), although my keys are always a result from LongSerializer.get().toByteBuffer(key) i'm interested in looking into it - but can you provide a code example? From what i can see TextOutputFormat.LineRecordWriter.write(..) doesn't clone anything, but it does write it out immediately. While ColumnFamilyRecordWriter does batch the mutations up as you say, it takes a ByteBuffer as a key, why/how are you re-using this client-side (arn't you creating a new ByteBuffer each call to write(..))? ~mck -- "Never let your sense of morals get in the way of doing what's right." Isaac Asimov | http://semb.wever.org | http://sesat.no | http://finn.no | Java XSS Filter signature.asc Description: This is a digitally signed message part
[mapreduce] ColumnFamilyRecordWriter hidden reuse
Hi, I play with Cassandra 0.7.0 and Hadoop, developing simple MapReduce tasks. While developing really simple MR task, I've found that a combiantion of Hadoop optimalization and Cassandra ColumnFamilyRecordWriter queue creates wrong keys to send to batch_mutate(). The proble is in the reduce part, the storage behind the key parameter is reused. For example when storing URL I'll get: http://119.cz/index.php/vypalovaci-mechaniky-a-vypalovani-disk/120-jak-zjistit-verzi-firmwaru-vypalovaky-ve-windows-vista (1) http://11superstars.xf.cz/index.php?page=12y-a-vypalovani-disk/120-jak-zjistit-verzi-firmwaru-vypalovaky-ve-windows-vista (2) http://12kmenu.unas.cz/18-6-2011-(Isachar).htmlvypalovani-disk/120-jak-zjistit-verzi-firmwaru-vypalovaky-ve-windows-vista (3) You can see, that part of the URL (1) is repeating in the URL (2) and URL (3). I've changed the my reduce method to clone the key before calling the context.write(), but I think it should be cloned inside the Cassandra ColumnFamilyRecordWriter because I as a user I don't care about how is it implemented inside, I just write values there. For example the FileOutputFormat, I don't need to clone the key when writting to it. I'd like to know what's your opinion. Best regards, Patrik