On 25 Nov 2015, at 07:01, Michael <mfr...@fastest.cc<mailto:mfr...@fastest.cc>> wrote:
so basically writing them into a temporary directory named with the batch time and then move the files to their destination on success ? I wished there was a way to skip moving files around and be able to set the output filenames. that's how everything else does it: relies on rename() being atomic and O(1) on HDFS. Just create the temp dir with the same parent dir as the destination, so in encrypted HDFS they are both in the same encryption zone. And know that renames in S3n/s3a are neither atomic nor O(1), so it's not how you commit things there Thanks Burak :) -Michael On Mon, Nov 23, 2015, at 09:19 PM, Burak Yavuz wrote: Not sure if it would be the most efficient, but maybe you can think of the filesystem as a key value store, and write each batch to a sub-directory, where the directory name is the batch time. If the directory already exists, then you shouldn't write it. Then you may have a following batch job that will coalesce files, in order to "close the day". Burak On Mon, Nov 23, 2015 at 8:58 PM, Michael <mfr...@fastest.cc<mailto:mfr...@fastest.cc>> wrote: Hi all, I'm working on project with spark streaming, the goal is to process log files from S3 and save them on hadoop to later analyze them with sparkSQL. Everything works well except when I kill the spark application and restart it: it picks up from the latest processed batch and reprocesses it which results in duplicate data on hdfs. How can I make the writing step on hdfs idempotent ? I couldn't find any way to control for example the filenames of the parquet files being written, the idea being to include the batch time so that the same batch gets written always on the same path. I've also tried with mode("overwrite") but looks that each batch gets written on the same file every time. Any help would be greatly appreciated. Thanks, Michael -- def save_rdd(batch_time, rdd): sqlContext = SQLContext(rdd.context) df = sqlContext.createDataFrame(rdd, log_schema) df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory) def create_ssc(checkpoint_dir, spark_master): sc = SparkContext(spark_master, app_name) ssc = StreamingContext(sc, batch_interval) ssc.checkpoint(checkpoint_dir) parsed = dstream.map(lambda line: log_parser(line)) parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd) return ssc ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda: create_ssc(checkpoint_dir, spark_master) ssc.start() ssc.awaitTermination() --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>