Do you really need to create a DStream from the original messaging queue?
Can't you just read them in a while loop or something on the driver?

On Fri, Jun 16, 2017 at 1:01 PM, Mike Hugo <m...@piragua.com> wrote:

> Hello,
>
> I have a web application that publishes JSON messages on to a messaging
> queue that contain metadata and a link to a CSV document on S3.  I'd like
> to iterate over these JSON messages, and for each one pull the CSV document
> into spark SQL to transform it (based on the metadata in the JSON message)
> and output the results to a search index.  Each file on S3 has different
> headers, potentially different delimiters, and differing numbers of rows.
>
> Basically what I'm trying to do is something like this:
>
>         JavaDStream<ParsedDocument> parsedMetadataAndRows =
> queueStream.map(new Function<String, ParsedDocument>() {
>             @Override
>             ParsedDocument call(String metadata) throws Exception {
>                 Map gson = new Gson().fromJson(metadata, Map.class)
>
>                 // get metadata from gson
>                 String s3Url = gson.url
>                 String delimiter = gson.delimiter
>                 // etc...
>
>                 // read s3Url
>                 Dataset dataFrame = sqlContext.read()
>                         .format("com.databricks.spark.csv")
>                         .option("delimiter", delimiter)
>                         .option("header", true)
>                         .option("inferSchema", true)
>                         .load(url)
>
>                 // process document,
>                 ParsedDocument docPlusRows = //...
>
>                 return docPlusRows
>             })
>
>             JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows,
> "index/type" // ...
>
>
> But it appears I cannot get access to the sqlContext when I run this on
> the spark cluster because that code is executing in the executor not in the
> driver.
>
> Is there a way I can access or create a SqlContext to be able to pull the
> file down from S3 in my map function?  Or do you have any recommendations
> as to how I could set up a streaming job in a different way that would
> allow me to accept metadata on the stream of records coming in and pull
> each file down from s3 for processing?
>
> Thanks in advance for your help!
>
> Mike
>

Reply via email to