Hi,

To test Spark SQL Vs CQL performance on Cassandra, I did the following:

1) Cassandra standalone server (1 server in a cluster)
2) Spark Master and 1 Worker
Both running in a Thinkpad laptop with 4 cores and 8GB RAM.
3) Written Spark SQL code using Cassandra-Spark Driver from Cassandra
(JavaApiDemo.java. Run with spark://127.0.0.1:7077 127.0.0.1)
4) Writen CQL code using Java driver from Cassandra
(CassandraJavaApiDemo.java)
In both the case, I create 1 millions rows and query for 1

Observation:
1) It takes less than 10 milliseconds using CQL (SELECT * FROM users WHERE
name='Anna')
2) It takes around .6 second using Spark (either SELECT * FROM users WHERE
name='Anna' or javaFunctions(sc).cassandraTable("test", "people",
mapRowTo(Person.class)).where("name=?", "Anna");

Please let me know if I am missing something in Spark configuration or
Cassandra-Spark Driver.

Thanks
Ajay Garga
package com.datastax.demo;

import java.text.SimpleDateFormat;
import java.util.Date;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.QueryTrace;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.QueryBuilder;

public class CassandraJavaApiDemo {
	private static SimpleDateFormat format = new SimpleDateFormat(
			"HH:mm:ss.SSS");

	public static void main(String[] args) {
		Cluster cluster = null;
		Session session = null;

		try {
			cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
			session = cluster.connect();

			session.execute("DROP KEYSPACE IF EXISTS test2");
			session.execute("CREATE KEYSPACE test2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
			session.execute("CREATE TABLE test2.users (id INT, name TEXT, birth_date  TIMESTAMP, PRIMARY KEY (id) )");
			session.execute("CREATE INDEX people_name_idx2 ON test2.users(name)");

			session = cluster.connect("test2");
			Statement insert = null;
			for (int i = 0; i < 1000000; i++) {
				insert = QueryBuilder.insertInto("users").value("id", i)
						.value("name", "Anna" + i)
						.value("birth_date", new Date());
				session.execute(insert);
			}

			long start = System.currentTimeMillis();
			Statement scan = new SimpleStatement(
					"SELECT * FROM users WHERE name='Anna0';");
			scan.enableTracing();
			ResultSet results = session.execute(scan);
			for (Row row : results) {
				System.out.format("%d %s\n", row.getInt("id"),
						row.getString("name"));
			}
			long end = System.currentTimeMillis();
			System.out.println(" Time Taken " +  (end - start));
			ExecutionInfo executionInfo = results.getExecutionInfo();
			QueryTrace queryTrace = executionInfo.getQueryTrace();

			System.out.printf("%-38s | %-12s | %-10s | %-12s\n", "activity",
					"timestamp", "source", "source_elapsed");
			System.out
					.println("---------------------------------------+--------------+------------+--------------");
			for (QueryTrace.Event event : queryTrace.getEvents()) {
				System.out.printf("%38s | %12s | %10s | %12s\n",
						event.getDescription(),
						millis2Date(event.getTimestamp()), event.getSource(),
						event.getSourceElapsedMicros());
			}

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (session != null) {
				session.close();
			}
			if (cluster != null) {
				cluster.close();
			}
		}
	}

	private static Object millis2Date(long timestamp) {
		return format.format(timestamp);
	}
}
package com.datastax.spark.connector.demo;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.google.common.base.Objects;

import org.apache.hadoop.util.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.cassandra.CassandraSQLContext;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

/**
 * This Spark application demonstrates how to use Spark Cassandra Connector with
 * Java.
 * <p/>
 * In order to run it, you will need to run Cassandra database, and create the
 * following keyspace, table and secondary index:
 * <p/>
 * 
 * <pre>
 * CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
 * 
 * CREATE TABLE test.people (
 *      id          INT,
 *      name        TEXT,
 *      birth_date  TIMESTAMP,
 *      PRIMARY KEY (id)
 * );
 * 
 * CREATE INDEX people_name_idx ON test.people(name);
 * </pre>
 */
public class JavaApiDemo implements Serializable {   

	public JavaApiDemo(SparkConf conf) {

		JavaSparkContext sc = new JavaSparkContext(conf);
		generateData(sc);
		
		// here we are going to save some data to Cassandra...
//		List<Person> people = Arrays.asList(
//				Person.newInstance(1, "John", new Date()),
//				Person.newInstance(2, "Anna", new Date()),
//				Person.newInstance(3, "Andrew", new Date()));
		List<Person> people = null;
		JavaRDD<Person> rdd = null;
		for(int k = 0; k < 10; k++) {
			people = new ArrayList<Person>();
			for(int i = 0; i < 100000; i++) {
				people.add(Person.newInstance(i, "Anna" + i, new Date()));
			}
			
			rdd = sc.parallelize(people);
			javaFunctions(rdd).writerBuilder("test", "people",
					mapToRow(Person.class)).saveToCassandra();
		}

		// use case: we want to read that data as an RDD of CassandraRows and
		// convert them to strings...
		JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable(
				"test", "people").map(new Function<CassandraRow, String>() {
			@Override
			public String call(CassandraRow cassandraRow) throws Exception {
				return cassandraRow.toString();
			}
		});
		System.out.println("Data as CassandraRows: \n" + cassandraRowsRDD.collect().get(0));

		// use case: we want to read that data as an RDD of Person beans and
		// also convert them to strings...
		JavaRDD<String> rdd2 = javaFunctions(sc).cassandraTable("test",
				"people", mapRowTo(Person.class)).map(
				new Function<Person, String>() {
					@Override
					public String call(Person person) throws Exception {
						return person.toString();
					}
				});
		System.out.println("Data as Person beans: \n" + rdd2.collect().get(0));

		// use case: we want to filter rows on the database side with use of the
		// where clause
		JavaRDD<String> rdd3 = javaFunctions(sc)
				.cassandraTable("test", "people", mapRowTo(Person.class))
				.where("name=?", "Anna0").map(new Function<Person, String>() {
					@Override
					public String call(Person person) throws Exception {
						return person.toString();
					}
				});
		System.out
				.println("Data filtered by the where clause (name='Anna'): \n"
						+ StringUtils.join("\n", rdd3.collect()));

		// use case: we want to explicitly set a projection on the column set
		JavaRDD<String> rdd4 = javaFunctions(sc)
				.cassandraTable("test", "people").select("id")
				.map(new Function<CassandraRow, String>() {
					@Override
					public String call(CassandraRow cassandraRow)
							throws Exception {
						return cassandraRow.toString();
					}
				});
		System.out.println("Data with only 'id' column fetched: \n" + rdd4.collect().get(0));

		CassandraSQLContext cc = new CassandraSQLContext(sc.sc());
		cc.setKeyspace("test");

		long start = System.currentTimeMillis();
		SchemaRDD rdd5 = cc.cassandraSql("SELECT * FROM people WHERE name='Anna0'");
		System.out.println("Data got by SQL: \n" + rdd5.collect());
		long end = System.currentTimeMillis();
		System.out.println(" Time Taken " + (end - start));
		sc.stop();
	}

	private void generateData(JavaSparkContext sc) {
		CassandraConnector connector = CassandraConnector.apply(sc.getConf());

		// Prepare the schema
		try (Session session = connector.openSession()) {
			session.execute("DROP KEYSPACE IF EXISTS test");
			session.execute("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
			session.execute("CREATE TABLE test.people (id INT, name TEXT, birth_date  TIMESTAMP, PRIMARY KEY (id) )");
			session.execute("CREATE INDEX people_name_idx ON test.people(name)");
		}
	}

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.setAppName("Java API demo");
		conf.setMaster(args[0]);
		conf.set("spark.cassandra.connection.host", args[1]);

		new JavaApiDemo(conf);
	}

	public static class Person implements Serializable {
		private Integer id;
		private String name;
		private Date birthDate;

		public static Person newInstance(Integer id, String name, Date birthDate) {
			Person person = new Person();
			person.setId(id);
			person.setName(name);
			person.setBirthDate(birthDate);
			return person;
		}

		public Integer getId() {
			return id;
		}

		public void setId(Integer id) {
			this.id = id;
		}

		public String getName() {
			return name;
		}

		public void setName(String name) {
			this.name = name;
		}

		public Date getBirthDate() {
			return birthDate;
		}

		public void setBirthDate(Date birthDate) {
			this.birthDate = birthDate;
		}

		@Override
		public String toString() {
			return Objects.toStringHelper(this).add("id", id).add("name", name)
					.add("birthDate", birthDate).toString();
		}
	}

}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to