A Hadoop Output Format That Targets Cassandra
---------------------------------------------

                 Key: CASSANDRA-1101
                 URL: https://issues.apache.org/jira/browse/CASSANDRA-1101
             Project: Cassandra
          Issue Type: New Feature
          Components: Hadoop
    Affects Versions: 0.6.1
            Reporter: Karthick Sankarachary


Currently, there exists a Hadoop-specific input format (viz., 
ColumnFamilyInputFormat) that allows one to iterate over the rows in a given 
Cassandra column family and treat it as the input to a Hadoop map task. By the 
same token, one may need to feed the output of a Hadoop reduce task into a 
Cassandra column family, for which no mechanism exists today. This calls for 
the definition of a Hadoop-specific output format which accepts a pair of key 
and columns, and writes it out to a given column family.

Here, we describe an output format known as ColumnFamilyOutputFormat, which 
allows reduce tasks to persist keys and their associated columns as Cassandra 
rows in a given column family.  By default, it prevents overwriting existing 
rows in the column family, by ensuring at initialization time that it contains 
no rows in the given slice predicate. For the sake of speed, it employs a lazy 
write-back caching mechanism, where its record writer batches mutations created 
based on the reduce's inputs (in a task-specific map) but stops short of 
actually mutating the rows. The latter responsibility falls on its output 
committer, which makes the changes official by sending a batch mutate request 
to Cassandra.  

The record writer, which is called ColumnFamilyRecordWriter, maps the input 
<key, value> pairs to a Cassandra column family. In particular, it creates 
mutations for each column in the value, which it then associates with the key, 
and in turn the responsible endpoint.  Note that, given that round trips to the 
server are fairly expensive, it merely batches the mutations in-memory, and 
leaves it on the output committer to send the batched mutations to the server.  
Furthermore, the writer groups the mutations by the endpoint responsible for 
the rows being affected. This allows the output committer to execute the 
mutations in parallel, on an endpoint-by-endpoint basis.

The output committer, which is called ColumnFamilyOutputCommitter, traverses 
the mutations collected by the record writer, and sends them to the endpoints 
responsible for them. Since the total set of mutations is partitioned by their 
endpoints, each of which can be performed in parallel, it allows us to commit 
the mutations using multiple threads, one per endpoint. As a result, it reduces 
the time it takes to propagate the mutations to the server considering that (a) 
the client eliminates one network hop that the server would otherwise have had 
to make and (b) each endpoint node has to deal with but a sub-set of the total 
set of mutations.

For convenience, we also define a default reduce task, called 
ColumnFamilyOutputReducer, which collects the columns in the input value and 
maps them to a data structure expected by Cassandra. By default, it assumes the 
input value to be in the form of a ColumnWritable, which denotes a name value 
pair corresponding to a certain column. This reduce task is in turn used by the 
attached test case, which maps every <key, value> pair in a sample input 
sequence file to a <key, column> pair, and then reduces them by aggregating 
columns corresponding to the same key. Eventually, the batched <key, columns> 
pairs are written to the column family associated with the output format.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to