couple things to add here:

1) you can import the
org.apache.spark.streaming.dstream.PairDStreamFunctions implicit which adds
a whole ton of functionality to DStream itself.  this lets you work at the
DStream level versus digging into the underlying RDDs.

2) you can use ssc.fileStream(directory) to create an input stream made up
of files in a given directory.  new files will be added to the stream as
they appear in that directory.  note:  files must be immutable.


On Tue, Jul 22, 2014 at 8:39 AM, Barnaby Falls <bfa...@outlook.com> wrote:

> Thanks Sean! I got that working last night similar to how you solved it.
> Any ideas about how to monitor that same folder in another script by
> creating a stream? I can use sc.sequenceFile() to read in the RDD, but how
> do I get the name of the file that got added since there is no
> sequenceFileStream() method? Thanks again for your help.
>
> > On Jul 22, 2014, at 1:57, "Sean Owen" <so...@cloudera.com> wrote:
> >
> > What about simply:
> >
> > dstream.foreachRDD(_.saveAsSequenceFile(...))
> >
> > ?
> >
> >> On Tue, Jul 22, 2014 at 2:06 AM, Barnaby <bfa...@outlook.com> wrote:
> >> First of all, I do not know Scala, but learning.
> >>
> >> I'm doing a proof of concept by streaming content from a socket,
> counting
> >> the words and write it to a Tachyon disk. A different script will read
> the
> >> file stream and print out the results.
> >>
> >> val lines = ssc.socketTextStream(args(0), args(1).toInt,
> >> StorageLevel.MEMORY_AND_DISK_SER)
> >> val words = lines.flatMap(_.split(" "))
> >> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> >> wordCounts.saveAs???Files("tachyon://localhost:19998/files/WordCounts")
> >> ssc.start()
> >> ssc.awaitTermination()
> >>
> >> I already did a proof of concept to write and read sequence files but
> there
> >> doesn't seem to be a saveAsSequenceFiles() method in DStream. What is
> the
> >> best way to write out an RDD to a stream so that the timestamps are in
> the
> >> filenames and so there is minimal overhead in reading the data back in
> as
> >> "objects", see below.
> >>
> >> My simple successful proof was the following:
> >> val rdd =  sc.parallelize(Array(("a",2), ("b",3), ("c",1)))
> >> rdd.saveAsSequenceFile("tachyon://.../123.sf2")
> >> val rdd2 = sc.sequenceFile[String,Int]("tachyon://.../123.sf2")
> >>
> >> How can I do something similar with streaming?
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to