Hi

@Marco, the multiple rows written are not dupes as current timestamp field
is different in each of them.

@Ayan I checked and found that my whole code is rerun twice. Although there
seems to be no error, is it configurable to re-run by cluster manager?



On Tue, Oct 17, 2017 at 6:45 PM, ayan guha <guha.a...@gmail.com> wrote:

> It should not be parallel exec as the logging code is called in driver.
> Have you checked if your driver is reran by cluster manager due to any
> failure or error situation>
>
> On Tue, Oct 17, 2017 at 11:52 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Hi
>>  Uh if the problem is really with parallel exec u can try to call
>> repartition(1) before u save
>> Alternatively try to store data in a csv file and see if u have same
>> behaviour, to exclude dynamodb issues
>> Also ..are the multiple rows being written dupes (they have all same
>> fields/values)?
>> Hth
>>
>>
>> On Oct 17, 2017 1:08 PM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>>
>>> This is the code -
>>> hdfs_path=<path to a file in hdfs>
>>> if(hdfs_path.contains(".avro")){
>>>       data_df = spark.read.format("com.databri
>>> cks.spark.avro").load(hdfs_path)
>>>     }else if(hdfs_path.contains(".tsv")){
>>>       data_df = spark.read.option("delimiter",
>>> "\t").option("header","true").csv(hdfs_path)
>>>     }else if(hdfs_path.contains(".scsv")){
>>>       data_df = spark.read.option("delimiter",
>>> ";").option("header","true").csv(hdfs_path)
>>>     }else{
>>>       System.exit(1)
>>>     }
>>>     data_df = data_df.withColumn("edl_created_by",
>>> lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
>>>     data_df.write.mode("append").parquet(dest_file)
>>>     val status1 = AddLogToDynamo(Json.toJson(fil
>>> eLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelin
>>> eage.dynamodb.update.function.name"), GetAuth.getLambdaClient)
>>>
>>>     def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName:
>>> String,lambdaClient: AWSLambdaClient):String = {
>>>       System.out.println("new metadata to be updated:
>>> "+updatedLambdaJson);
>>>       val updatelambdaReq:InvokeRequest = new InvokeRequest();
>>>       updatelambdaReq.setFunctionName(updateFunctionName);
>>>       updatelambdaReq.setPayload(updatedLambdaJson.toString());
>>>       System.out.println("Calling lambda to add log");
>>>       val updateLambdaResult = byteBufferToString(lambdaClien
>>> t.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
>>>       return updateLambdaResult;
>>>   }
>>>
>>>
>>> Harsh Choudhary
>>>
>>> On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Can you share your code?
>>>>
>>>> On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <shry.ha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I'm running a Spark job in which I am appending new data into Parquet
>>>>> file. At last, I make a log entry in my Dynamodb table stating the number
>>>>> of records appended, time etc. Instead of one single entry in the 
>>>>> database,
>>>>> multiple entries are being made to it. Is it because of parallel execution
>>>>> of code in workers? If it is so then how can I solve it so that it only
>>>>> writes once.
>>>>>
>>>>> *Thanks!*
>>>>>
>>>>> *Cheers!*
>>>>>
>>>>> Harsh Choudhary
>>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to