Hi,
To test Spark SQL Vs CQL performance on Cassandra, I did the following:
1) Cassandra standalone server (1 server in a cluster)
2) Spark Master and 1 Worker
Both running in a Thinkpad laptop with 4 cores and 8GB RAM.
3) Written Spark SQL code using Cassandra-Spark Driver from Cassandra
(JavaApiDemo.java. Run with spark://127.0.0.1:7077 127.0.0.1)
4) Writen CQL code using Java driver from Cassandra
(CassandraJavaApiDemo.java)
In both the case, I create 1 millions rows and query for 1
Observation:
1) It takes less than 10 milliseconds using CQL (SELECT * FROM users WHERE
name='Anna')
2) It takes around .6 second using Spark (either SELECT * FROM users WHERE
name='Anna' or javaFunctions(sc).cassandraTable("test", "people",
mapRowTo(Person.class)).where("name=?", "Anna");
Please let me know if I am missing something in Spark configuration or
Cassandra-Spark Driver.
Thanks
Ajay Garga
package com.datastax.demo;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.QueryTrace;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
public class CassandraJavaApiDemo {
private static SimpleDateFormat format = new SimpleDateFormat(
"HH:mm:ss.SSS");
public static void main(String[] args) {
Cluster cluster = null;
Session session = null;
try {
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
session = cluster.connect();
session.execute("DROP KEYSPACE IF EXISTS test2");
session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE test2.users (id INT, name TEXT, birth_date TIMESTAMP, PRIMARY KEY (id) )");
session.execute("CREATE INDEX people_name_idx2 ON test2.users(name)");
session = cluster.connect("test2");
Statement insert = null;
for (int i = 0; i < 1000000; i++) {
insert = QueryBuilder.insertInto("users").value("id", i)
.value("name", "Anna" + i)
.value("birth_date", new Date());
session.execute(insert);
}
long start = System.currentTimeMillis();
Statement scan = new SimpleStatement(
"SELECT * FROM users WHERE name='Anna0';");
scan.enableTracing();
ResultSet results = session.execute(scan);
for (Row row : results) {
System.out.format("%d %s\n", row.getInt("id"),
row.getString("name"));
}
long end = System.currentTimeMillis();
System.out.println(" Time Taken " + (end - start));
ExecutionInfo executionInfo = results.getExecutionInfo();
QueryTrace queryTrace = executionInfo.getQueryTrace();
System.out.printf("%-38s | %-12s | %-10s | %-12s\n", "activity",
"timestamp", "source", "source_elapsed");
System.out
.println("---------------------------------------+--------------+------------+--------------");
for (QueryTrace.Event event : queryTrace.getEvents()) {
System.out.printf("%38s | %12s | %10s | %12s\n",
event.getDescription(),
millis2Date(event.getTimestamp()), event.getSource(),
event.getSourceElapsedMicros());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
session.close();
}
if (cluster != null) {
cluster.close();
}
}
}
private static Object millis2Date(long timestamp) {
return format.format(timestamp);
}
}
package com.datastax.spark.connector.demo;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.google.common.base.Objects;
import org.apache.hadoop.util.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.cassandra.CassandraSQLContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
/**
* This Spark application demonstrates how to use Spark Cassandra Connector with
* Java.
* <p/>
* In order to run it, you will need to run Cassandra database, and create the
* following keyspace, table and secondary index:
* <p/>
*
* <pre>
* CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
*
* CREATE TABLE test.people (
* id INT,
* name TEXT,
* birth_date TIMESTAMP,
* PRIMARY KEY (id)
* );
*
* CREATE INDEX people_name_idx ON test.people(name);
* </pre>
*/
public class JavaApiDemo implements Serializable {
public JavaApiDemo(SparkConf conf) {
JavaSparkContext sc = new JavaSparkContext(conf);
generateData(sc);
// here we are going to save some data to Cassandra...
// List<Person> people = Arrays.asList(
// Person.newInstance(1, "John", new Date()),
// Person.newInstance(2, "Anna", new Date()),
// Person.newInstance(3, "Andrew", new Date()));
List<Person> people = null;
JavaRDD<Person> rdd = null;
for(int k = 0; k < 10; k++) {
people = new ArrayList<Person>();
for(int i = 0; i < 100000; i++) {
people.add(Person.newInstance(i, "Anna" + i, new Date()));
}
rdd = sc.parallelize(people);
javaFunctions(rdd).writerBuilder("test", "people",
mapToRow(Person.class)).saveToCassandra();
}
// use case: we want to read that data as an RDD of CassandraRows and
// convert them to strings...
JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable(
"test", "people").map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
System.out.println("Data as CassandraRows: \n" + cassandraRowsRDD.collect().get(0));
// use case: we want to read that data as an RDD of Person beans and
// also convert them to strings...
JavaRDD<String> rdd2 = javaFunctions(sc).cassandraTable("test",
"people", mapRowTo(Person.class)).map(
new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + rdd2.collect().get(0));
// use case: we want to filter rows on the database side with use of the
// where clause
JavaRDD<String> rdd3 = javaFunctions(sc)
.cassandraTable("test", "people", mapRowTo(Person.class))
.where("name=?", "Anna0").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out
.println("Data filtered by the where clause (name='Anna'): \n"
+ StringUtils.join("\n", rdd3.collect()));
// use case: we want to explicitly set a projection on the column set
JavaRDD<String> rdd4 = javaFunctions(sc)
.cassandraTable("test", "people").select("id")
.map(new Function<CassandraRow, String>() {
@Override
public String call(CassandraRow cassandraRow)
throws Exception {
return cassandraRow.toString();
}
});
System.out.println("Data with only 'id' column fetched: \n" + rdd4.collect().get(0));
CassandraSQLContext cc = new CassandraSQLContext(sc.sc());
cc.setKeyspace("test");
long start = System.currentTimeMillis();
SchemaRDD rdd5 = cc.cassandraSql("SELECT * FROM people WHERE name='Anna0'");
System.out.println("Data got by SQL: \n" + rdd5.collect());
long end = System.currentTimeMillis();
System.out.println(" Time Taken " + (end - start));
sc.stop();
}
private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
// Prepare the schema
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS test");
session.execute("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE test.people (id INT, name TEXT, birth_date TIMESTAMP, PRIMARY KEY (id) )");
session.execute("CREATE INDEX people_name_idx ON test.people(name)");
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
new JavaApiDemo(conf);
}
public static class Person implements Serializable {
private Integer id;
private String name;
private Date birthDate;
public static Person newInstance(Integer id, String name, Date birthDate) {
Person person = new Person();
person.setId(id);
person.setName(name);
person.setBirthDate(birthDate);
return person;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Date getBirthDate() {
return birthDate;
}
public void setBirthDate(Date birthDate) {
this.birthDate = birthDate;
}
@Override
public String toString() {
return Objects.toStringHelper(this).add("id", id).add("name", name)
.add("birthDate", birthDate).toString();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]