Hi, To test Spark SQL Vs CQL performance on Cassandra, I did the following:
1) Cassandra standalone server (1 server in a cluster) 2) Spark Master and 1 Worker Both running in a Thinkpad laptop with 4 cores and 8GB RAM. 3) Written Spark SQL code using Cassandra-Spark Driver from Cassandra (JavaApiDemo.java. Run with spark://127.0.0.1:7077 127.0.0.1) 4) Writen CQL code using Java driver from Cassandra (CassandraJavaApiDemo.java) In both the case, I create 1 millions rows and query for 1 Observation: 1) It takes less than 10 milliseconds using CQL (SELECT * FROM users WHERE name='Anna') 2) It takes around .6 second using Spark (either SELECT * FROM users WHERE name='Anna' or javaFunctions(sc).cassandraTable("test", "people", mapRowTo(Person.class)).where("name=?", "Anna"); Please let me know if I am missing something in Spark configuration or Cassandra-Spark Driver. Thanks Ajay Garga
package com.datastax.demo; import java.text.SimpleDateFormat; import java.util.Date; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ExecutionInfo; import com.datastax.driver.core.QueryTrace; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; public class CassandraJavaApiDemo { private static SimpleDateFormat format = new SimpleDateFormat( "HH:mm:ss.SSS"); public static void main(String[] args) { Cluster cluster = null; Session session = null; try { cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); session = cluster.connect(); session.execute("DROP KEYSPACE IF EXISTS test2"); session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}"); session.execute("CREATE TABLE test2.users (id INT, name TEXT, birth_date TIMESTAMP, PRIMARY KEY (id) )"); session.execute("CREATE INDEX people_name_idx2 ON test2.users(name)"); session = cluster.connect("test2"); Statement insert = null; for (int i = 0; i < 1000000; i++) { insert = QueryBuilder.insertInto("users").value("id", i) .value("name", "Anna" + i) .value("birth_date", new Date()); session.execute(insert); } long start = System.currentTimeMillis(); Statement scan = new SimpleStatement( "SELECT * FROM users WHERE name='Anna0';"); scan.enableTracing(); ResultSet results = session.execute(scan); for (Row row : results) { System.out.format("%d %s\n", row.getInt("id"), row.getString("name")); } long end = System.currentTimeMillis(); System.out.println(" Time Taken " + (end - start)); ExecutionInfo executionInfo = results.getExecutionInfo(); QueryTrace queryTrace = executionInfo.getQueryTrace(); System.out.printf("%-38s | %-12s | %-10s | %-12s\n", "activity", "timestamp", "source", "source_elapsed"); System.out .println("---------------------------------------+--------------+------------+--------------"); for (QueryTrace.Event event : queryTrace.getEvents()) { System.out.printf("%38s | %12s | %10s | %12s\n", event.getDescription(), millis2Date(event.getTimestamp()), event.getSource(), event.getSourceElapsedMicros()); } } catch (Exception e) { e.printStackTrace(); } finally { if (session != null) { session.close(); } if (cluster != null) { cluster.close(); } } } private static Object millis2Date(long timestamp) { return format.format(timestamp); } }
package com.datastax.spark.connector.demo; import com.datastax.driver.core.Session; import com.datastax.spark.connector.cql.CassandraConnector; import com.datastax.spark.connector.japi.CassandraRow; import com.google.common.base.Objects; import org.apache.hadoop.util.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SchemaRDD; import org.apache.spark.sql.cassandra.CassandraSQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; /** * This Spark application demonstrates how to use Spark Cassandra Connector with * Java. * <p/> * In order to run it, you will need to run Cassandra database, and create the * following keyspace, table and secondary index: * <p/> * * <pre> * CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}; * * CREATE TABLE test.people ( * id INT, * name TEXT, * birth_date TIMESTAMP, * PRIMARY KEY (id) * ); * * CREATE INDEX people_name_idx ON test.people(name); * </pre> */ public class JavaApiDemo implements Serializable { public JavaApiDemo(SparkConf conf) { JavaSparkContext sc = new JavaSparkContext(conf); generateData(sc); // here we are going to save some data to Cassandra... // List<Person> people = Arrays.asList( // Person.newInstance(1, "John", new Date()), // Person.newInstance(2, "Anna", new Date()), // Person.newInstance(3, "Andrew", new Date())); List<Person> people = null; JavaRDD<Person> rdd = null; for(int k = 0; k < 10; k++) { people = new ArrayList<Person>(); for(int i = 0; i < 100000; i++) { people.add(Person.newInstance(i, "Anna" + i, new Date())); } rdd = sc.parallelize(people); javaFunctions(rdd).writerBuilder("test", "people", mapToRow(Person.class)).saveToCassandra(); } // use case: we want to read that data as an RDD of CassandraRows and // convert them to strings... JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable( "test", "people").map(new Function<CassandraRow, String>() { @Override public String call(CassandraRow cassandraRow) throws Exception { return cassandraRow.toString(); } }); System.out.println("Data as CassandraRows: \n" + cassandraRowsRDD.collect().get(0)); // use case: we want to read that data as an RDD of Person beans and // also convert them to strings... JavaRDD<String> rdd2 = javaFunctions(sc).cassandraTable("test", "people", mapRowTo(Person.class)).map( new Function<Person, String>() { @Override public String call(Person person) throws Exception { return person.toString(); } }); System.out.println("Data as Person beans: \n" + rdd2.collect().get(0)); // use case: we want to filter rows on the database side with use of the // where clause JavaRDD<String> rdd3 = javaFunctions(sc) .cassandraTable("test", "people", mapRowTo(Person.class)) .where("name=?", "Anna0").map(new Function<Person, String>() { @Override public String call(Person person) throws Exception { return person.toString(); } }); System.out .println("Data filtered by the where clause (name='Anna'): \n" + StringUtils.join("\n", rdd3.collect())); // use case: we want to explicitly set a projection on the column set JavaRDD<String> rdd4 = javaFunctions(sc) .cassandraTable("test", "people").select("id") .map(new Function<CassandraRow, String>() { @Override public String call(CassandraRow cassandraRow) throws Exception { return cassandraRow.toString(); } }); System.out.println("Data with only 'id' column fetched: \n" + rdd4.collect().get(0)); CassandraSQLContext cc = new CassandraSQLContext(sc.sc()); cc.setKeyspace("test"); long start = System.currentTimeMillis(); SchemaRDD rdd5 = cc.cassandraSql("SELECT * FROM people WHERE name='Anna0'"); System.out.println("Data got by SQL: \n" + rdd5.collect()); long end = System.currentTimeMillis(); System.out.println(" Time Taken " + (end - start)); sc.stop(); } private void generateData(JavaSparkContext sc) { CassandraConnector connector = CassandraConnector.apply(sc.getConf()); // Prepare the schema try (Session session = connector.openSession()) { session.execute("DROP KEYSPACE IF EXISTS test"); session.execute("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}"); session.execute("CREATE TABLE test.people (id INT, name TEXT, birth_date TIMESTAMP, PRIMARY KEY (id) )"); session.execute("CREATE INDEX people_name_idx ON test.people(name)"); } } public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("Java API demo"); conf.setMaster(args[0]); conf.set("spark.cassandra.connection.host", args[1]); new JavaApiDemo(conf); } public static class Person implements Serializable { private Integer id; private String name; private Date birthDate; public static Person newInstance(Integer id, String name, Date birthDate) { Person person = new Person(); person.setId(id); person.setName(name); person.setBirthDate(birthDate); return person; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getBirthDate() { return birthDate; } public void setBirthDate(Date birthDate) { this.birthDate = birthDate; } @Override public String toString() { return Objects.toStringHelper(this).add("id", id).add("name", name) .add("birthDate", birthDate).toString(); } } }
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org