Hello, I have a web application that publishes JSON messages on to a messaging queue that contain metadata and a link to a CSV document on S3. I'd like to iterate over these JSON messages, and for each one pull the CSV document into spark SQL to transform it (based on the metadata in the JSON message) and output the results to a search index. Each file on S3 has different headers, potentially different delimiters, and differing numbers of rows.
Basically what I'm trying to do is something like this: JavaDStream<ParsedDocument> parsedMetadataAndRows = queueStream.map(new Function<String, ParsedDocument>() { @Override ParsedDocument call(String metadata) throws Exception { Map gson = new Gson().fromJson(metadata, Map.class) // get metadata from gson String s3Url = gson.url String delimiter = gson.delimiter // etc... // read s3Url Dataset dataFrame = sqlContext.read() .format("com.databricks.spark.csv") .option("delimiter", delimiter) .option("header", true) .option("inferSchema", true) .load(url) // process document, ParsedDocument docPlusRows = //... return docPlusRows }) JavaEsSparkStreaming.saveToEs(parsedMetadataAndRows, "index/type" // ... But it appears I cannot get access to the sqlContext when I run this on the spark cluster because that code is executing in the executor not in the driver. Is there a way I can access or create a SqlContext to be able to pull the file down from S3 in my map function? Or do you have any recommendations as to how I could set up a streaming job in a different way that would allow me to accept metadata on the stream of records coming in and pull each file down from s3 for processing? Thanks in advance for your help! Mike