Not sure if it would be the most efficient, but maybe you can think of the filesystem as a key value store, and write each batch to a sub-directory, where the directory name is the batch time. If the directory already exists, then you shouldn't write it. Then you may have a following batch job that will coalesce files, in order to "close the day".
Burak On Mon, Nov 23, 2015 at 8:58 PM, Michael <mfr...@fastest.cc> wrote: > Hi all, > > I'm working on project with spark streaming, the goal is to process log > files from S3 and save them on hadoop to later analyze them with > sparkSQL. > Everything works well except when I kill the spark application and > restart it: it picks up from the latest processed batch and reprocesses > it which results in duplicate data on hdfs. > > How can I make the writing step on hdfs idempotent ? I couldn't find any > way to control for example the filenames of the parquet files being > written, the idea being to include the batch time so that the same batch > gets written always on the same path. > I've also tried with mode("overwrite") but looks that each batch gets > written on the same file every time. > Any help would be greatly appreciated. > > Thanks, > Michael > > -- > > def save_rdd(batch_time, rdd): > sqlContext = SQLContext(rdd.context) > df = sqlContext.createDataFrame(rdd, log_schema) > > df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory) > > def create_ssc(checkpoint_dir, spark_master): > > sc = SparkContext(spark_master, app_name) > ssc = StreamingContext(sc, batch_interval) > ssc.checkpoint(checkpoint_dir) > > parsed = dstream.map(lambda line: log_parser(line)) > parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd) > > return ssc > > ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda: > create_ssc(checkpoint_dir, spark_master) > ssc.start() > ssc.awaitTermination() > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >