Hi All,

I am facing a weird situation which is explained below.

Scenario and Problem: I want to add two attributes to JSON object based on the 
look up table values and insert the JSON to Mongo DB. I have broadcast variable 
which holds look up table. However, i am not being able to access it inside 
foreachPartition as you can see in the code. It does not give me any error but 
simply does not display anything. Also, because of it i cant insert JSON to 
Mongo DB. I cant find any explanation to this behaviour. Any explanation or 
work around to make it work is much appreciated.

Note: I am using spark streaming and the file comes as micro batch to HDFS. I 
want to be able to use more cores in spark streaming (i have 16 cores 
available) so that processing can be done faster. 

Thanks.

Regards,
Prajwol

Here is my full code:

object ProcessMicroBatchStreams {
val calculateDistance = udf { 
 (lat: String, lon: String) =>      
 GeoHash.getDistance(lat.toDouble, lon.toDouble) }
 val DB_NAME = "IRT"
 val COLLECTION_NAME = "sensordata"
 val records = Array[String]()

def main(args: Array[String]): Unit = {
  if (args.length < 0) {
  System.err.println("Usage: ProcessMicroBatchStreams <master> 
<input_directory>")
  System.exit(1)
}
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName(this.getClass.getCanonicalName)
  .set("spark.hadoop.validateOutputSpecs", "false")
/*.set("spark.executor.instances", "3")
.set("spark.executor.memory", "18g")
.set("spark.executor.cores", "9")
.set("spark.task.cpus", "1")
.set("spark.driver.memory", "10g")*/

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc, 
sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)


ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
  .foreachRDD { rdd =>
  //broadcastTable.value.show() // I can access broadcast value here
  if (!rdd.partitions.isEmpty) {
    val partitionedRDD = rdd.repartition(4)
    partitionedRDD.foreachPartition {
      partition =>
        println("Inside Partition")
        broadcastTable.value.show() // I cannot access broadcast value here
        partition.foreach {
          row =>
            val items = row.split("\n")
            items.foreach { item =>
              val mongoColl = MongoClient()(DB_NAME)(COLLECTION_NAME)
              val jsonObject = new JSONObject(item)
              val latitude = jsonObject.getDouble(Constants.LATITUDE)
              val longitude = jsonObject.getDouble(Constants.LONGITUDE)

              // The broadcast value is not being shown here
              // However, there is no error shown
              // I cannot insert the value into Mongo DB
              val selectedRow = broadcastTable.value
                .filter("geoCode LIKE '" + GeoHash.subString(latitude, 
longitude) + "%'")
                .withColumn("Distance", calculateDistance(col("Lat"), 
col("Lon")))
                .orderBy("Distance")
                .select(Constants.TRACK_KM, Constants.TRACK_NAME).take(1)
              if (selectedRow.length != 0) {
                jsonObject.put(Constants.TRACK_KM, selectedRow(0).get(0))
                jsonObject.put(Constants.TRACK_NAME, selectedRow(0).get(1))
              }
              else {
                jsonObject.put(Constants.TRACK_KM, "NULL")
                jsonObject.put(Constants.TRACK_NAME, "NULL")
              }
              val record = 
JSON.parse(jsonObject.toString()).asInstanceOf[DBObject]
              mongoColl.insert(record)
            }
        }
    }
  }
}
sys.addShutdownHook {
  ssc.stop(true, true)
}

ssc.start()
ssc.awaitTermination()
}
}

Reply via email to