Hi Mohammed
I'm aware of that documentation, what are you hinting at specifically?
I'm pushing all elements of the partition key, so that should work. As
user zero323 on SO pointed out it the problem is most probably related
to the dynamic nature of the predicate elements (two distributed
collections per filter per join).
The statement "To push down partition keys, all of them must be
included, but not more than one predicate per partition key, otherwise
nothing is pushed down."
Does not apply IMO?
Bernhard
Quoting Mohammed Guller <moham...@glassbeam.com>:
Hi Bernhard,
Take a look at the examples shown under the "Pushing down clauses to
Cassandra" sections on this page:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
Mohammed
Author: Big Data Analytics with Spark
-----Original Message-----
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 10:05 PM
To: Mohammed Guller
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] Joining Kafka and Cassandra DataFrames
Hi Mohammed
Thanks for hint, I should probably do that :)
As for the DF singleton:
/**
* Lazily instantiated singleton instance of base_data DataFrame
*/
object base_data_df {
@transient private var instance: DataFrame = _
def getInstance(sqlContext: SQLContext): DataFrame = {
if (instance == null) {
// Load DataFrame with C* data-source
instance = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "cf", "keyspace" -> "ks"))
.load()
}
instance
}
}
Bernhard
Quoting Mohammed Guller <moham...@glassbeam.com>:
You may have better luck with this question on the Spark Cassandra
Connector mailing list.
One quick question about this code from your email:
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
What exactly is base_data_df and how are you creating it?
Mohammed
Author: Big Data Analytics with
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/
1484209656/>
-----Original Message-----
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames
All,
I'm new to Spark and I'm having a hard time doing a simple join of two
DFs
Intent:
- I'm receiving data from Kafka via direct stream and would like to
enrich the messages with data from Cassandra. The Kafka messages
(Protobufs) are decoded into DataFrames and then joined with a
(supposedly pre-filtered) DF from Cassandra. The relation of (Kafka)
streaming batch size to raw C* data is [several streaming messages to
millions of C* rows], BUT the join always yields exactly ONE result
[1:1] per message. After the join the resulting DF is eventually
stored to another C* table.
Problem:
- Even though I'm joining the two DFs on the full Cassandra primary
key and pushing the corresponding filter to C*, it seems that Spark is
loading the whole C* data-set into memory before actually joining
(which I'd like to prevent by using the filter/predicate pushdown).
This leads to a lot of shuffling and tasks being spawned, hence the
"simple" join takes forever...
Could anyone shed some light on this? In my perception this should be
a prime-example for DFs and Spark Streaming.
Environment:
- Spark 1.6
- Cassandra 2.1.12
- Cassandra-Spark-Connector 1.5-RC1
- Kafka 0.8.2.2
Code:
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" ->
"xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")
// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg,
StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
// Executed on the driver
messages.foreachRDD { rdd =>
// Create an instance of SQLContext
val sqlContext =
SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
// Inner join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))
joinedDf.show()
joinedDf.printSchema()
// Select relevant fields
// Persist
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
SO:
http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandr
a-dataframes-in-spark-streaming-ignores-c-predicate-p
---------------------------------------------------------------------
To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org<mailto:user-unsubscribe@spark.apache
.org>
For additional commands, e-mail:
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org