THanks. Sorry the last section was supposed be streams.par.foreach { nameAndStream => nameAndStream._2.foreachRDD { rdd => df = sqlContext.jsonRDD(rdd)
df.insertInto(stream._1) } } ssc.start() On Fri, Jul 24, 2015 at 10:39 AM, Dean Wampler <deanwamp...@gmail.com> wrote: > You don't need the "par" (parallel) versions of the Scala collections, > actually, Recall that you are building a pipeline in the driver, but it > doesn't start running cluster tasks until ssc.start() is called, at which > point Spark will figure out the task parallelism. In fact, you might as > well do the foreachRDD call within the initial map. No need for the streams > collection, unless you need it for something else. Test it out to make sure > I'm not wrong ;) > > However, I'm a little confused by the per-stream logic. It looks like > you're using foreachRDD to dump each input stream into the same output > location "stream._1". True? If it's a directory, you'll get an error that > it already exists for the *second* stream in "streams". If you're just > funneling all 500 inputs into the same output location, how about using > DStream.union to combine all the input streams into one, then have one > foreachRDD to write output? > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Fri, Jul 24, 2015 at 11:23 AM, Brandon White <bwwintheho...@gmail.com> > wrote: > >> Hello, >> >> So I have about 500 Spark Streams and I want to know the fastest and most >> reliable way to process each of them. Right now, I am creating and process >> them in a list: >> >> val ssc = new StreamingContext(sc, Minutes(10)) >> >> >> val streams = paths.par.map { nameAndPath => >> (path._1, ssc.textFileStream(path._1)) >> } >> >> streams.par.foreach { nameAndStream => >> streamTuple.foreachRDD { rdd => >> df = sqlContext.jsonRDD(rdd) >> >> df.insertInto(stream._1) >> } >> } >> >> ssc.start() >> >> >> >> Is this the best way to do this? Are there any better faster methods? >> >> >