Spark newbie here, using Spark 1.3.1.

I’m consuming a stream and trying to pipe the data from the entire window to R 
for analysis.  The R algorithm needs the entire dataset from the stream 
(everything in the window) in order to function properly; it can’t be broken up.

So I tried doing a coalesce(1) before calling pipe(), but it still seems to be 
breaking up the data and invoking R, but it still seems to to be breaking up 
the data and invoking R multiple times with small pieces of data.  Is there 
some other approach I should try?

Here’s a small snippet:

    val inputs: DStream[String] = MQTTUtils.createStream(ssc, mqttBrokerUrl, 
inputsTopic, StorageLevel.MEMORY_AND_DISK_SER)
      .window(duration)
    inputs.foreachRDD {
      windowRdd => {
        if (windowRdd.count() > 0) processWindow(windowRdd)
      }
    }

...

  def processWindow(windowRdd: RDD[String]) = {
    // call R script to process data
    windowRdd.coalesce(1)
    val outputsRdd: RDD[String] = 
windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString))
    outputsRdd.cache()

    if (outputsRdd.count() > 0) processOutputs(outputsRdd)
  }

...

This e-mail message may contain privileged and/or confidential information, and 
is intended to be received only by persons entitled
to receive such information. If you have received this e-mail in error, please 
notify the sender immediately. Please delete it and
all attachments from any servers, hard drives or any other media. Other use of 
this e-mail by you is strictly prohibited.

All e-mails and attachments sent and received are subject to monitoring, 
reading and archival by Monsanto, including its
subsidiaries. The recipient of this e-mail is solely responsible for checking 
for the presence of "Viruses" or other "Malware".
Monsanto, along with its subsidiaries, accepts no liability for any damage 
caused by any such code transmitted by or accompanying
this e-mail or any attachment.


The information contained in this email may be subject to the export control 
laws and regulations of the United States, potentially
including but not limited to the Export Administration Regulations (EAR) and 
sanctions regulations issued by the U.S. Department of
Treasury, Office of Foreign Asset Controls (OFAC).  As a recipient of this 
information you are obligated to comply with all
applicable U.S. export laws and regulations.

Reply via email to