Not sure if it would be the most efficient, but maybe you can think of the
filesystem as a key value store, and write each batch to a sub-directory,
where the directory name is the batch time. If the directory already
exists, then you shouldn't write it. Then you may have a following batch
job that will coalesce files, in order to "close the day".

Burak

On Mon, Nov 23, 2015 at 8:58 PM, Michael <mfr...@fastest.cc> wrote:

> Hi all,
>
> I'm working on project with spark streaming, the goal is to process log
> files from S3 and save them on hadoop to later analyze them with
> sparkSQL.
> Everything works well except when I kill the spark application and
> restart it: it picks up from the latest processed batch and reprocesses
> it which results in duplicate data on hdfs.
>
> How can I make the writing step on hdfs idempotent ? I couldn't find any
> way to control for example the filenames of the parquet files being
> written, the idea being to include the batch time so that the same batch
> gets written always on the same path.
> I've also tried with mode("overwrite") but looks that each batch gets
> written on the same file every time.
> Any help would be greatly appreciated.
>
> Thanks,
> Michael
>
> --
>
> def save_rdd(batch_time, rdd):
>         sqlContext = SQLContext(rdd.context)
>         df = sqlContext.createDataFrame(rdd, log_schema)
>
> df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory)
>
> def create_ssc(checkpoint_dir, spark_master):
>
>     sc = SparkContext(spark_master, app_name)
>     ssc = StreamingContext(sc, batch_interval)
>     ssc.checkpoint(checkpoint_dir)
>
>     parsed = dstream.map(lambda line: log_parser(line))
>     parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd)
>
>     return ssc
>
> ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda:
> create_ssc(checkpoint_dir, spark_master)
> ssc.start()
> ssc.awaitTermination()
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to