[ 
https://issues.apache.org/jira/browse/CASSANDRA-1101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12871372#action_12871372
 ] 

Karthick Sankarachary commented on CASSANDRA-1101:
--------------------------------------------------

{quote}It doesn't look like the JobContext.IO_SORT_MB property you use in the 
test exists in Hadoop 0.20.1, which trunk is using. Is that property 
necessary?{quote}

It's gone. At one point, I was running out of memory, and copped out by setting 
that limit lower, but I now believe that it's a non-issue.

{quote}ColumnFamilyOutputCommitter is still in the latest patch: can you remove 
it?{quote}

Done, sorry about that.

{quote}ColumnFamilyOutputFormatTest throws a ClassNotFoundException, since not 
all of Hadoop's dependencies are being pulled in. Can you add the dependencies 
to ivy.xml, or record the minimal set of dependencies which we can add manually 
to the lib/ folder?{quote}

As it turns out, the missing dependency was commons-logging, which I've added 
to ivy.xml. Just to clarify, this feature works with the existing Hadoop 0.20.1 
library, and appears to be forward compatible with future versions as well.

{quote}Also, one more nitpick: can you update the code to follow 
http://wiki.apache.org/cassandra/CodeStyle ?{quote}

Done. 

On a related note, I had to modify my test case so that it works in a 
standalone mode, and more importantly in the context of the ant script. Before, 
the test case worked with the assumption that a Cassandra server would be 
running locally. Now, it automatically kicks off an embedded Cassandra server 
(see org.apache.cassandra.EmbeddedServer), which by default uses a Thrift-based 
daemon, but may be overloaded to use an Avro-based daemon, if so desired. Note 
that there's a similar service called EmbeddedCassandraService, but it appears 
to be locked into thrift, and moreover, does not follow the exact same 
lifecycle that the daemons define in their main method.

Regards,
Karthick

> A Hadoop Output Format That Targets Cassandra
> ---------------------------------------------
>
>                 Key: CASSANDRA-1101
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-1101
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Hadoop
>    Affects Versions: 0.6.1
>            Reporter: Karthick Sankarachary
>            Assignee: Stu Hood
>         Attachments: CASSANDRA-1101-V1.patch, CASSANDRA-1101-V2.patch, 
> CASSANDRA-1101-V3.patch, CASSANDRA-1101.patch
>
>
> Currently, there exists a Hadoop-specific input format (viz., 
> ColumnFamilyInputFormat) that allows one to iterate over the rows in a given 
> Cassandra column family and treat it as the input to a Hadoop map task. By 
> the same token, one may need to feed the output of a Hadoop reduce task into 
> a Cassandra column family, for which no mechanism exists today. This calls 
> for the definition of a Hadoop-specific output format which accepts a pair of 
> key and columns, and writes it out to a given column family.
> Here, we describe an output format known as ColumnFamilyOutputFormat, which 
> allows reduce tasks to persist keys and their associated columns as Cassandra 
> rows in a given column family.  By default, it prevents overwriting existing 
> rows in the column family, by ensuring at initialization time that it 
> contains no rows in the given slice predicate. For the sake of speed, it 
> employs a lazy write-back caching mechanism, where its record writer batches 
> mutations created based on the reduce's inputs (in a task-specific map) but 
> stops short of actually mutating the rows. The latter responsibility falls on 
> its output committer, which makes the changes official by sending a batch 
> mutate request to Cassandra.  
> The record writer, which is called ColumnFamilyRecordWriter, maps the input 
> <key, value> pairs to a Cassandra column family. In particular, it creates 
> mutations for each column in the value, which it then associates with the 
> key, and in turn the responsible endpoint.  Note that, given that round trips 
> to the server are fairly expensive, it merely batches the mutations 
> in-memory, and leaves it on the output committer to send the batched 
> mutations to the server.  Furthermore, the writer groups the mutations by the 
> endpoint responsible for the rows being affected. This allows the output 
> committer to execute the mutations in parallel, on an endpoint-by-endpoint 
> basis.
> The output committer, which is called ColumnFamilyOutputCommitter, traverses 
> the mutations collected by the record writer, and sends them to the endpoints 
> responsible for them. Since the total set of mutations is partitioned by 
> their endpoints, each of which can be performed in parallel, it allows us to 
> commit the mutations using multiple threads, one per endpoint. As a result, 
> it reduces the time it takes to propagate the mutations to the server 
> considering that (a) the client eliminates one network hop that the server 
> would otherwise have had to make and (b) each endpoint node has to deal with 
> but a sub-set of the total set of mutations.
> For convenience, we also define a default reduce task, called 
> ColumnFamilyOutputReducer, which collects the columns in the input value and 
> maps them to a data structure expected by Cassandra. By default, it assumes 
> the input value to be in the form of a ColumnWritable, which denotes a name 
> value pair corresponding to a certain column. This reduce task is in turn 
> used by the attached test case, which maps every <key, value> pair in a 
> sample input sequence file to a <key, column> pair, and then reduces them by 
> aggregating columns corresponding to the same key. Eventually, the batched 
> <key, columns> pairs are written to the column family associated with the 
> output format.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to