[ https://issues.apache.org/jira/browse/CASSANDRA-1101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stu Hood updated CASSANDRA-1101: -------------------------------- Attachment: 1101-clock-fix.diff Hey Karthick, sorry for the additional delay reviewing this. I'm attaching one patch I had to apply to get it to build against trunk, but this should be the last round of review. * ColumnWritable implements byte[] comparison, but should use o.a.c.utils.FBUtilities.compareByteArrays * ColumnWritable implements equality in terms of reference equality: is that intentional? * ColumnWritable assumes both name and value can be converted to Strings, which is not safe: use FBUtilities.(write|read)ByteArray instead * Why add EmbeddedServer rather than using EmbeddedCassandraService? Thanks a ton for your work! > A Hadoop Output Format That Targets Cassandra > --------------------------------------------- > > Key: CASSANDRA-1101 > URL: https://issues.apache.org/jira/browse/CASSANDRA-1101 > Project: Cassandra > Issue Type: New Feature > Components: Hadoop > Affects Versions: 0.6.1 > Reporter: Karthick Sankarachary > Assignee: Stu Hood > Attachments: 1101-clock-fix.diff, CASSANDRA-1101-V1.patch, > CASSANDRA-1101-V2.patch, CASSANDRA-1101-V3.patch, CASSANDRA-1101.patch > > > Currently, there exists a Hadoop-specific input format (viz., > ColumnFamilyInputFormat) that allows one to iterate over the rows in a given > Cassandra column family and treat it as the input to a Hadoop map task. By > the same token, one may need to feed the output of a Hadoop reduce task into > a Cassandra column family, for which no mechanism exists today. This calls > for the definition of a Hadoop-specific output format which accepts a pair of > key and columns, and writes it out to a given column family. > Here, we describe an output format known as ColumnFamilyOutputFormat, which > allows reduce tasks to persist keys and their associated columns as Cassandra > rows in a given column family. By default, it prevents overwriting existing > rows in the column family, by ensuring at initialization time that it > contains no rows in the given slice predicate. For the sake of speed, it > employs a lazy write-back caching mechanism, where its record writer batches > mutations created based on the reduce's inputs (in a task-specific map) but > stops short of actually mutating the rows. The latter responsibility falls on > its output committer, which makes the changes official by sending a batch > mutate request to Cassandra. > The record writer, which is called ColumnFamilyRecordWriter, maps the input > <key, value> pairs to a Cassandra column family. In particular, it creates > mutations for each column in the value, which it then associates with the > key, and in turn the responsible endpoint. Note that, given that round trips > to the server are fairly expensive, it merely batches the mutations > in-memory, and leaves it on the output committer to send the batched > mutations to the server. Furthermore, the writer groups the mutations by the > endpoint responsible for the rows being affected. This allows the output > committer to execute the mutations in parallel, on an endpoint-by-endpoint > basis. > The output committer, which is called ColumnFamilyOutputCommitter, traverses > the mutations collected by the record writer, and sends them to the endpoints > responsible for them. Since the total set of mutations is partitioned by > their endpoints, each of which can be performed in parallel, it allows us to > commit the mutations using multiple threads, one per endpoint. As a result, > it reduces the time it takes to propagate the mutations to the server > considering that (a) the client eliminates one network hop that the server > would otherwise have had to make and (b) each endpoint node has to deal with > but a sub-set of the total set of mutations. > For convenience, we also define a default reduce task, called > ColumnFamilyOutputReducer, which collects the columns in the input value and > maps them to a data structure expected by Cassandra. By default, it assumes > the input value to be in the form of a ColumnWritable, which denotes a name > value pair corresponding to a certain column. This reduce task is in turn > used by the attached test case, which maps every <key, value> pair in a > sample input sequence file to a <key, column> pair, and then reduces them by > aggregating columns corresponding to the same key. Eventually, the batched > <key, columns> pairs are written to the column family associated with the > output format. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.