Hi Mohammed

Thanks for hint, I should probably do that :)

As for the DF singleton:

/**
 * Lazily instantiated singleton instance of base_data DataFrame
 */
object base_data_df {

  @transient private var instance: DataFrame = _

  def getInstance(sqlContext: SQLContext): DataFrame = {
    if (instance == null) {
      // Load DataFrame with C* data-source
      instance = sqlContext.read
        .format("org.apache.spark.sql.cassandra")
        .options(Map("table" -> "cf", "keyspace" -> "ks"))
        .load()
    }
    instance
  }
}

Bernhard

Quoting Mohammed Guller <moham...@glassbeam.com>:

You may have better luck with this question on the Spark Cassandra Connector mailing list.



One quick question about this code from your email:

       // Load DataFrame from C* data-source

       val base_data = base_data_df.getInstance(sqlContext)



What exactly is base_data_df and how are you creating it?

Mohammed
Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>



-----Original Message-----
From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch]
Sent: Tuesday, February 9, 2016 6:58 AM
To: user@spark.apache.org
Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames



All,



I'm new to Spark and I'm having a hard time doing a simple join of two DFs



Intent:

- I'm receiving data from Kafka via direct stream and would like to enrich the messages with data from Cassandra. The Kafka messages

(Protobufs) are decoded into DataFrames and then joined with a (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size to raw C* data is [several streaming messages to millions of C* rows], BUT the join always yields exactly ONE result [1:1] per message. After the join the resulting DF is eventually stored to another C* table.



Problem:

- Even though I'm joining the two DFs on the full Cassandra primary key and pushing the corresponding filter to C*, it seems that Spark is loading the whole C* data-set into memory before actually joining (which I'd like to prevent by using the filter/predicate pushdown).

This leads to a lot of shuffling and tasks being spawned, hence the "simple" join takes forever...



Could anyone shed some light on this? In my perception this should be a prime-example for DFs and Spark Streaming.



Environment:

- Spark 1.6

- Cassandra 2.1.12

- Cassandra-Spark-Connector 1.5-RC1

- Kafka 0.8.2.2



Code:



def main(args: Array[String]) {

     val conf = new SparkConf()

       .setAppName("test")

       .set("spark.cassandra.connection.host", "xxx")

       .set("spark.cassandra.connection.keep_alive_ms", "30000")

       .setMaster("local[*]")



     val ssc = new StreamingContext(conf, Seconds(10))

     ssc.sparkContext.setLogLevel("INFO")



     // Initialise Kafka

     val kafkaTopics = Set[String]("xxx")

     val kafkaParams = Map[String, String](

       "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",

       "auto.offset.reset" -> "smallest")



     // Kafka stream

val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)



     // Executed on the driver

     messages.foreachRDD { rdd =>



       // Create an instance of SQLContext

       val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)

       import sqlContext.implicits._



       // Map MyMsg RDD

       val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}



       // Convert RDD[MyMsg] to DataFrame

       val MyMsgDf = MyMsgRdd.toDF()

        .select(

             $"prim1Id" as 'prim1_id,

             $"prim2Id" as 'prim2_id,

             $...

       )



       // Load DataFrame from C* data-source

       val base_data = base_data_df.getInstance(sqlContext)



       // Inner join on prim1Id and prim2Id

       val joinedDf = MyMsgDf.join(base_data,

             MyMsgDf("prim1_id") === base_data("prim1_id") &&

             MyMsgDf("prim2_id") === base_data("prim2_id"), "left")

             .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))

                 && base_data("prim2_id").isin(MyMsgDf("prim2_id")))



       joinedDf.show()

       joinedDf.printSchema()



       // Select relevant fields



       // Persist



     }



     // Start the computation

     ssc.start()

     ssc.awaitTermination()

}



SO:

http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p







---------------------------------------------------------------------

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to