Brian Hess created CASSANDRA-9259:
--------------------------------------
Summary: Bulk Reading from Cassandra
Key: CASSANDRA-9259
URL: https://issues.apache.org/jira/browse/CASSANDRA-9259
Project: Cassandra
Issue Type: Improvement
Components: Core
Reporter: Brian Hess
This ticket is following on from the 2015 NGCC. This ticket is designed to be
a place for discussing and designing an approach to bulk reading.
The goal is to have a bulk reading path for Cassandra. That is, a path
optimized to grab a large portion of the data for a table (potentially all of
it). This is a core element in the Spark integration with Cassandra, and the
speed at which Cassandra can deliver bulk data to Spark is limiting the
performance of Spark-plus-Cassandra operations. This is especially of
importance as Cassandra will (likely) leverage Spark for internal operations
(for example CASSANDRA-8234).
The core CQL to consider is the following:
SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND
Token(partitionKey) <= Y
Here, we choose X and Y to be contained within one token range (perhaps
considering the primary range of a node without vnodes, for example). This
query pushes 50K-100K rows/sec, which is not very fast if we are doing bulk
operations via Spark (or other processing frameworks - ETL, etc). There are a
few causes (e.g., inefficient paging).
There are a few approaches that could be considered. First, we consider a new
"Streaming Compaction" approach. The key observation here is that a bulk read
from Cassandra is a lot like a major compaction, though instead of outputting a
new SSTable we would output CQL rows to a stream/socket/etc. This would be
similar to a CompactionTask, but would strip out some unnecessary things in
there (e.g., some of the indexing, etc). Predicates and projections could also
be encapsulated in this new "StreamingCompactionTask", for example.
Another approach would be an alternate storage format. For example, we might
employ Parquet (just as an example) to store the same data as in the primary
Cassandra storage (aka SSTables). This is akin to Global Indexes (an alternate
storage of the same data optimized for a particular query). Then, Cassandra
can choose to leverage this alternate storage for particular CQL queries (e.g.,
range scans).
These are just 2 suggestions to get the conversation going.
One thing to note is that it will be useful to have this storage segregated by
token range so that when you extract via these mechanisms you do not get
replications-factor numbers of copies of the data. That will certainly be an
issue for some Spark operations (e.g., counting). Thus, we will want
per-token-range storage (even for single disks), so this will likely leverage
CASSANDRA-6696 (though, we'll want to also consider the single disk case).
It is also worth discussing what the success criteria is here. It is unlikely
to be as fast as EDW or HDFS performance (though, that is still a good goal),
but being within some percentage of that performance should be set as success.
For example, 2x as long as doing bulk operations on HDFS with similar node
count/size/etc.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)