Do you really need to create a DStream from the original messaging queue? Can't you just read them in a while loop or something on the driver?
On Fri, Jun 16, 2017 at 1:01 PM, Mike Hugo <m...@piragua.com> wrote: > Hello, > > I have a web application that publishes JSON messages on to a messaging > queue that contain metadata and a link to a CSV document on S3. I'd like > to iterate over these JSON messages, and for each one pull the CSV document > into spark SQL to transform it (based on the metadata in the JSON message) > and output the results to a search index. Each file on S3 has different > headers, potentially different delimiters, and differing numbers of rows. > > Basically what I'm trying to do is something like this: > > JavaDStream<ParsedDocument> parsedMetadataAndRows = > queueStream.map(new Function<String, ParsedDocument>() { > @Override > ParsedDocument call(String metadata) throws Exception { > Map gson = new Gson().fromJson(metadata, Map.class) > > // get metadata from gson > String s3Url = gson.url > String delimiter = gson.delimiter > // etc... > > // read s3Url > Dataset dataFrame = sqlContext.read() > .format("com.databricks.spark.csv") > .option("delimiter", delimiter) > .option("header", true) > .option("inferSchema", true) > .load(url) > > // process document, > ParsedDocument docPlusRows = //... > > return docPlusRows > }) > > JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows, > "index/type" // ... > > > But it appears I cannot get access to the sqlContext when I run this on > the spark cluster because that code is executing in the executor not in the > driver. > > Is there a way I can access or create a SqlContext to be able to pull the > file down from S3 in my map function? Or do you have any recommendations > as to how I could set up a streaming job in a different way that would > allow me to accept metadata on the stream of records coming in and pull > each file down from s3 for processing? > > Thanks in advance for your help! > > Mike >