I think what you’ll want is to carry out the .map functions before the 
foreachRDD, something like:

    val lines = 
ssc.textFileStream("/stream").map(Sensor.parseSensor).map(Sensor.convertToPut)

    lines.foreachRDD { rdd =>
      // parse the line of data into sensor object
     rdd.saveAsHadoopDataset(jobConfig)

    }

Will perform the bulk of the work in the distributed processing, before the 
results are returned to the driver for writing to HBase.

Thanks,
Ewan

From: Carol McDonald [mailto:cmcdon...@maprtech.com]
Sent: 28 August 2015 15:30
To: user <user@spark.apache.org>
Subject: correct use of DStream foreachRDD

I would like to make sure  that I am using the DStream  foreachRDD operation 
correctly. I would like to read from a DStream transform the input and write to 
HBase.  The code below works , but I became confused when I read "Note that the 
function func is executed in the driver process" ?


    val lines = ssc.textFileStream("/stream")

    lines.foreachRDD { rdd =>
      // parse the line of data into sensor object
      val sensorRDD = rdd.map(Sensor.parseSensor)

      // convert sensor data to put object and write to HBase table column 
family data
      new PairRDDFunctions(sensorRDD.
                  map(Sensor.convertToPut)).
                  saveAsHadoopDataset(jobConfig)

    }

Reply via email to