Hi all, I'm working on project with spark streaming, the goal is to process log files from S3 and save them on hadoop to later analyze them with sparkSQL. Everything works well except when I kill the spark application and restart it: it picks up from the latest processed batch and reprocesses it which results in duplicate data on hdfs.
How can I make the writing step on hdfs idempotent ? I couldn't find any way to control for example the filenames of the parquet files being written, the idea being to include the batch time so that the same batch gets written always on the same path. I've also tried with mode("overwrite") but looks that each batch gets written on the same file every time. Any help would be greatly appreciated. Thanks, Michael -- def save_rdd(batch_time, rdd): sqlContext = SQLContext(rdd.context) df = sqlContext.createDataFrame(rdd, log_schema) df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory) def create_ssc(checkpoint_dir, spark_master): sc = SparkContext(spark_master, app_name) ssc = StreamingContext(sc, batch_interval) ssc.checkpoint(checkpoint_dir) parsed = dstream.map(lambda line: log_parser(line)) parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd) return ssc ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda: create_ssc(checkpoint_dir, spark_master) ssc.start() ssc.awaitTermination() --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org