Hi Bernhard, Take a look at the examples shown under the "Pushing down clauses to Cassandra" sections on this page:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md Mohammed Author: Big Data Analytics with Spark -----Original Message----- From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] Sent: Tuesday, February 9, 2016 10:05 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames Hi Mohammed Thanks for hint, I should probably do that :) As for the DF singleton: /** * Lazily instantiated singleton instance of base_data DataFrame */ object base_data_df { @transient private var instance: DataFrame = _ def getInstance(sqlContext: SQLContext): DataFrame = { if (instance == null) { // Load DataFrame with C* data-source instance = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "cf", "keyspace" -> "ks")) .load() } instance } } Bernhard Quoting Mohammed Guller <moham...@glassbeam.com>: > You may have better luck with this question on the Spark Cassandra > Connector mailing list. > > > > One quick question about this code from your email: > > // Load DataFrame from C* data-source > > val base_data = base_data_df.getInstance(sqlContext) > > > > What exactly is base_data_df and how are you creating it? > > Mohammed > Author: Big Data Analytics with > Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/ > 1484209656/> > > > > -----Original Message----- > From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] > Sent: Tuesday, February 9, 2016 6:58 AM > To: user@spark.apache.org > Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames > > > > All, > > > > I'm new to Spark and I'm having a hard time doing a simple join of two > DFs > > > > Intent: > > - I'm receiving data from Kafka via direct stream and would like to > enrich the messages with data from Cassandra. The Kafka messages > > (Protobufs) are decoded into DataFrames and then joined with a > (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) > streaming batch size to raw C* data is [several streaming messages to > millions of C* rows], BUT the join always yields exactly ONE result > [1:1] per message. After the join the resulting DF is eventually > stored to another C* table. > > > > Problem: > > - Even though I'm joining the two DFs on the full Cassandra primary > key and pushing the corresponding filter to C*, it seems that Spark is > loading the whole C* data-set into memory before actually joining > (which I'd like to prevent by using the filter/predicate pushdown). > > This leads to a lot of shuffling and tasks being spawned, hence the > "simple" join takes forever... > > > > Could anyone shed some light on this? In my perception this should be > a prime-example for DFs and Spark Streaming. > > > > Environment: > > - Spark 1.6 > > - Cassandra 2.1.12 > > - Cassandra-Spark-Connector 1.5-RC1 > > - Kafka 0.8.2.2 > > > > Code: > > > > def main(args: Array[String]) { > > val conf = new SparkConf() > > .setAppName("test") > > .set("spark.cassandra.connection.host", "xxx") > > .set("spark.cassandra.connection.keep_alive_ms", "30000") > > .setMaster("local[*]") > > > > val ssc = new StreamingContext(conf, Seconds(10)) > > ssc.sparkContext.setLogLevel("INFO") > > > > // Initialise Kafka > > val kafkaTopics = Set[String]("xxx") > > val kafkaParams = Map[String, String]( > > "metadata.broker.list" -> > "xxx:32000,xxx:32000,xxx:32000,xxx:32000", > > "auto.offset.reset" -> "smallest") > > > > // Kafka stream > > val messages = KafkaUtils.createDirectStream[String, MyMsg, > StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics) > > > > // Executed on the driver > > messages.foreachRDD { rdd => > > > > // Create an instance of SQLContext > > val sqlContext = > SQLContextSingleton.getInstance(rdd.sparkContext) > > import sqlContext.implicits._ > > > > // Map MyMsg RDD > > val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)} > > > > // Convert RDD[MyMsg] to DataFrame > > val MyMsgDf = MyMsgRdd.toDF() > > .select( > > $"prim1Id" as 'prim1_id, > > $"prim2Id" as 'prim2_id, > > $... > > ) > > > > // Load DataFrame from C* data-source > > val base_data = base_data_df.getInstance(sqlContext) > > > > // Inner join on prim1Id and prim2Id > > val joinedDf = MyMsgDf.join(base_data, > > MyMsgDf("prim1_id") === base_data("prim1_id") && > > MyMsgDf("prim2_id") === base_data("prim2_id"), "left") > > .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id")) > > && base_data("prim2_id").isin(MyMsgDf("prim2_id"))) > > > > joinedDf.show() > > joinedDf.printSchema() > > > > // Select relevant fields > > > > // Persist > > > > } > > > > // Start the computation > > ssc.start() > > ssc.awaitTermination() > > } > > > > SO: > > http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandr > a-dataframes-in-spark-streaming-ignores-c-predicate-p > > > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: > user-unsubscr...@spark.apache.org<mailto:user-unsubscribe@spark.apache > .org> > For additional commands, e-mail: > user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org