Re: correct use of DStream foreachRDD
Thanks, this looks better // parse the lines of data into sensor objects val sensorDStream = ssc.textFileStream("/stream"). map(Sensor.parseSensor) sensorDStream.foreachRDD { rdd => // filter sensor data for low psi val alertRDD = rdd.filter(sensor => sensor.psi < 5.0) // convert sensor data to put object and write to HBase rdd.map(Sensor.convertToPut). saveAsHadoopDataset(jobConfig) // convert alert data to put object and write to HBase CF alert rdd.map(Sensor.convertToPutAlert). saveAsHadoopDataset(jobConfig) } // Start the computation ssc.start() On Fri, Aug 28, 2015 at 10:59 AM, Sean Owen wrote: > Yes, for example "val sensorRDD = rdd.map(Sensor.parseSensor)" is a > line of code executed on the driver; it's part the function you > supplied to foreachRDD. However that line defines an operation on an > RDD, and the map function you supplied (parseSensor) will ultimately > be carried out on the cluster. > > If you mean, is the bulk of the work (the Sensor.*) methods happening > on the cluster? yes. > > Ewan's version looks cleaner, though it will ultimately be equivalent > and doesn't cause operations to happen in a different place. > > (PS I don't think you need "new PairRDDFunctions"; the implicits it > defines should be automatically available. > "sensorRDD.map(Sensor.convertToPut)" should be sufficient. In slightly > older versions of Spark you have to import SparkContext._ to get these > implicits.) > > On Fri, Aug 28, 2015 at 3:29 PM, Carol McDonald > wrote: > > I would like to make sure that I am using the DStream foreachRDD > operation > > correctly. I would like to read from a DStream transform the input and > write > > to HBase. The code below works , but I became confused when I read "Note > > that the function func is executed in the driver process" ? > > > > > > val lines = ssc.textFileStream("/stream") > > > > lines.foreachRDD { rdd => > > // parse the line of data into sensor object > > val sensorRDD = rdd.map(Sensor.parseSensor) > > > > // convert sensor data to put object and write to HBase table > column > > family data > > new PairRDDFunctions(sensorRDD. > > map(Sensor.convertToPut)). > > saveAsHadoopDataset(jobConfig) > > > > } >
Re: correct use of DStream foreachRDD
Yes, for example "val sensorRDD = rdd.map(Sensor.parseSensor)" is a line of code executed on the driver; it's part the function you supplied to foreachRDD. However that line defines an operation on an RDD, and the map function you supplied (parseSensor) will ultimately be carried out on the cluster. If you mean, is the bulk of the work (the Sensor.*) methods happening on the cluster? yes. Ewan's version looks cleaner, though it will ultimately be equivalent and doesn't cause operations to happen in a different place. (PS I don't think you need "new PairRDDFunctions"; the implicits it defines should be automatically available. "sensorRDD.map(Sensor.convertToPut)" should be sufficient. In slightly older versions of Spark you have to import SparkContext._ to get these implicits.) On Fri, Aug 28, 2015 at 3:29 PM, Carol McDonald wrote: > I would like to make sure that I am using the DStream foreachRDD operation > correctly. I would like to read from a DStream transform the input and write > to HBase. The code below works , but I became confused when I read "Note > that the function func is executed in the driver process" ? > > > val lines = ssc.textFileStream("/stream") > > lines.foreachRDD { rdd => > // parse the line of data into sensor object > val sensorRDD = rdd.map(Sensor.parseSensor) > > // convert sensor data to put object and write to HBase table column > family data > new PairRDDFunctions(sensorRDD. > map(Sensor.convertToPut)). > saveAsHadoopDataset(jobConfig) > > } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: correct use of DStream foreachRDD
I think what you’ll want is to carry out the .map functions before the foreachRDD, something like: val lines = ssc.textFileStream("/stream").map(Sensor.parseSensor).map(Sensor.convertToPut) lines.foreachRDD { rdd => // parse the line of data into sensor object rdd.saveAsHadoopDataset(jobConfig) } Will perform the bulk of the work in the distributed processing, before the results are returned to the driver for writing to HBase. Thanks, Ewan From: Carol McDonald [mailto:cmcdon...@maprtech.com] Sent: 28 August 2015 15:30 To: user Subject: correct use of DStream foreachRDD I would like to make sure that I am using the DStream foreachRDD operation correctly. I would like to read from a DStream transform the input and write to HBase. The code below works , but I became confused when I read "Note that the function func is executed in the driver process" ? val lines = ssc.textFileStream("/stream") lines.foreachRDD { rdd => // parse the line of data into sensor object val sensorRDD = rdd.map(Sensor.parseSensor) // convert sensor data to put object and write to HBase table column family data new PairRDDFunctions(sensorRDD. map(Sensor.convertToPut)). saveAsHadoopDataset(jobConfig) }