Andy,

Using the rdd.saveAsTextFile(...)  will overwrite the data if your target
is the same file.

If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
suffix)  where a new file will be written at each streaming interval.
Note that this will result in a saved file for each streaming interval. If
you want to increase the file size (usually a good idea in HDFS), you can
use a window function over the dstream and save the 'windowed'  dstream
instead.

kind regards, Gerard.

On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I just started a new spark streaming project. In this phase of the system
> all we want to do is save the data we received to hdfs. I after running for
> a couple of days it looks like I am missing a lot of data. I wonder if
> saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I
> capture in previous window? I noticed that after running for a couple of
> days  my hdfs file system has 25 file. The names are something like 
> “part-00006”. I
> used 'hadoop fs –dus’ to check the total data captured. While the system
> was running I would periodically call ‘dus’ I was surprised sometimes the
> numbers of total bytes actually dropped.
>
>
> Is there a better way to save write my data to disk?
>
> Any suggestions would be appreciated
>
> Andy
>
>
>    public static void main(String[] args) {
>
>        SparkConf conf = new SparkConf().setAppName(appName);
>
>         JavaSparkContext jsc = new JavaSparkContext(conf);
>
>         JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
> Duration(5 * 1000));
>
>
> [ deleted code …]
>
>
> data.foreachRDD(new Function<JavaRDD<String>, Void>(){
>
>             private static final long serialVersionUID =
> -7957854392903581284L;
>
>
>             @Override
>
>             public Void call(JavaRDD<String> jsonStr) throws Exception {
>
>                 jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // 
> /rawSteamingData
> is a directory
>
>                 return null;
>
>             }
>
>         });
>
>
>
>         ssc.checkpoint(checkPointUri);
>
>
>
>         ssc.start();
>
>         ssc.awaitTermination();
>
> }
>

Reply via email to