Hi Mohammed
Thanks for hint, I should probably do that :)
As for the DF singleton:
/**
* Lazily instantiated singleton instance of base_data DataFrame
*/
object base_data_df {
@transient private var instance: DataFrame = _
def getInstance(sqlContext: SQLContext): DataFrame = {
if (instance == null) {
// Load DataFrame with C* data-source
instance = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "cf", "keyspace" -> "ks"))
.load()
}
instance
}
}
Bernhard
Quoting Mohammed Guller <moham...@glassbeam.com>:
You may have better luck with this question on the Spark Cassandra
Connector mailing list.
One quick question about this code from your email:
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
What exactly is base_data_df and how are you creating it?
Mohammed
Author: Big Data Analytics with
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
-----Original Message-----
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
All,
I'm new to Spark and I'm having a hard time doing a simple join of two DFs
Intent:
- I'm receiving data from Kafka via direct stream and would like to
enrich the messages with data from Cassandra. The Kafka messages
(Protobufs) are decoded into DataFrames and then joined with a
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
streaming batch size to raw C* data is [several streaming messages
to millions of C* rows], BUT the join always yields exactly ONE
result [1:1] per message. After the join the resulting DF is
eventually stored to another C* table.
Problem:
- Even though I'm joining the two DFs on the full Cassandra primary
key and pushing the corresponding filter to C*, it seems that Spark
is loading the whole C* data-set into memory before actually joining
(which I'd like to prevent by using the filter/predicate pushdown).
This leads to a lot of shuffling and tasks being spawned, hence the
"simple" join takes forever...
Could anyone shed some light on this? In my perception this should
be a prime-example for DFs and Spark Streaming.
Environment:
- Spark 1.6
- Cassandra 2.1.12
- Cassandra-Spark-Connector 1.5-RC1
- Kafka 0.8.2.2
Code:
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")
// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg,
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
// Executed on the driver
messages.foreachRDD { rdd =>
// Create an instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
// Inner join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))
joinedDf.show()
joinedDf.printSchema()
// Select relevant fields
// Persist
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
SO:
http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p
---------------------------------------------------------------------
To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail:
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org