On 25 Nov 2015, at 07:01, Michael <mfr...@fastest.cc<mailto:mfr...@fastest.cc>> 
wrote:

so basically writing them into a temporary directory named with the batch time 
and then move the files to their destination on success ? I wished there was a 
way to skip moving files around and be able to set the output filenames.


that's how everything else does it: relies on rename() being atomic and O(1) on 
HDFS. Just create the temp dir with the same parent dir as the destination, so 
in encrypted HDFS they are both in the same encryption zone.

And know that renames in S3n/s3a are neither atomic nor O(1), so it's not how 
you commit things there

Thanks Burak :)

-Michael


On Mon, Nov 23, 2015, at 09:19 PM, Burak Yavuz wrote:
Not sure if it would be the most efficient, but maybe you can think of the 
filesystem as a key value store, and write each batch to a sub-directory, where 
the directory name is the batch time. If the directory already exists, then you 
shouldn't write it. Then you may have a following batch job that will coalesce 
files, in order to "close the day".

Burak

On Mon, Nov 23, 2015 at 8:58 PM, Michael 
<mfr...@fastest.cc<mailto:mfr...@fastest.cc>> wrote:
Hi all,

I'm working on project with spark streaming, the goal is to process log
files from S3 and save them on hadoop to later analyze them with
sparkSQL.
Everything works well except when I kill the spark application and
restart it: it picks up from the latest processed batch and reprocesses
it which results in duplicate data on hdfs.

How can I make the writing step on hdfs idempotent ? I couldn't find any
way to control for example the filenames of the parquet files being
written, the idea being to include the batch time so that the same batch
gets written always on the same path.
I've also tried with mode("overwrite") but looks that each batch gets
written on the same file every time.
Any help would be greatly appreciated.

Thanks,
Michael

--

def save_rdd(batch_time, rdd):
        sqlContext = SQLContext(rdd.context)
        df = sqlContext.createDataFrame(rdd, log_schema)
        
df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory)

def create_ssc(checkpoint_dir, spark_master):

    sc = SparkContext(spark_master, app_name)
    ssc = StreamingContext(sc, batch_interval)
    ssc.checkpoint(checkpoint_dir)

    parsed = dstream.map(lambda line: log_parser(line))
    parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd)

    return ssc

ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda:
create_ssc(checkpoint_dir, spark_master)
ssc.start()
ssc.awaitTermination()

---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



Reply via email to