This is the code - hdfs_path=<path to a file in hdfs> if(hdfs_path.contains(".avro")){ data_df = spark.read.format("com.databricks.spark.avro").load(hdfs_path) }else if(hdfs_path.contains(".tsv")){ data_df = spark.read.option("delimiter","\t").option("header","true").csv(hdfs_path) }else if(hdfs_path.contains(".scsv")){ data_df = spark.read.option("delimiter",";").option("header","true").csv(hdfs_path) }else{ System.exit(1) } data_df = data_df.withColumn("edl_created_by", lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime)) data_df.write.mode("append").parquet(dest_file) val status1 = AddLogToDynamo(Json.toJson(fileLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelineage.dynamodb.update.function.name"), GetAuth.getLambdaClient)
def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName: String,lambdaClient: AWSLambdaClient):String = { System.out.println("new metadata to be updated: "+updatedLambdaJson); val updatelambdaReq:InvokeRequest = new InvokeRequest(); updatelambdaReq.setFunctionName(updateFunctionName); updatelambdaReq.setPayload(updatedLambdaJson.toString()); System.out.println("Calling lambda to add log"); val updateLambdaResult = byteBufferToString(lambdaClient.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8")); return updateLambdaResult; } Harsh Choudhary On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <guha.a...@gmail.com> wrote: > Can you share your code? > > On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <shry.ha...@gmail.com> > wrote: > >> Hi >> >> I'm running a Spark job in which I am appending new data into Parquet >> file. At last, I make a log entry in my Dynamodb table stating the number >> of records appended, time etc. Instead of one single entry in the database, >> multiple entries are being made to it. Is it because of parallel execution >> of code in workers? If it is so then how can I solve it so that it only >> writes once. >> >> *Thanks!* >> >> *Cheers!* >> >> Harsh Choudhary >> > -- > Best Regards, > Ayan Guha >