There are no failures or errors.  Irrespective of that I am seeing
duplicates. The steps and stages are all successful and even the
speculation is turned off .

On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Are you certain you aren't getting any failed tasks or other errors?
> Output actions like foreach aren't exactly once and will be retried on
> failures.
>
> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:
>
>> Dear fellow Spark Users,
>>
>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>> listens to Campaigns based on live stock feeds and the batch duration is 5
>> seconds. The applications uses Kafka DirectStream and based on the feed
>> source there are three streams. As given in the code snippet I am doing a
>> union of three streams and I am trying to remove the duplicate campaigns
>> received using reduceByKey based on the customer and campaignId. I could
>> see lot of duplicate email being send out for the same key in the same
>> batch.I was expecting reduceByKey to remove the duplicate campaigns in a
>> batch based on customer and campaignId. In logs I am even printing the the
>> key,batch time before sending the email and I could clearly see duplicates.
>> I could see some duplicates getting removed after adding log in reduceByKey
>> Function, but its not eliminating completely .
>>
>> JavaDStream<Campaign> matchedCampaigns = 
>> stream1.transform(CmpManager::getMatchedCampaigns)
>>             .union(stream2).union(stream3).cache();
>> JavaPairDStream<String, Campaign> uniqueCampaigns = 
>> matchedCampaigns.mapToPair(campaign->{
>>         String key=campaign.getCustomer()+"_"+campaign.getId();
>>         return new Tuple2<String, Campaigns>(key, campaign);
>>     }).reduceByKey((campaign1, campaign2)->{return campaign1;});
>>
>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>
>> I am not able to figure out where I am going wrong here . Please help me
>> here to get rid of this weird problem. Previously we were using
>> createStream for listening to Kafka Queue (number of partitions 1) , there
>> we didn't face this issue. But when we moved to directStream (number of
>> partitions 100) we could easily reproduce this issue on high load .
>>
>> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
>> instead of reduceByKey Operation, But even that didn't help.
>> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>> Durations.Seconds(5))
>> I have even requested for help on Stackoverflow , But I haven't received
>> any solutions to this issue.
>>
>>
>> *Stack Overflow Link================*
>> https://stackoverflow.com/questions/40559858/spark-streaming
>> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>
>>
>> Thanks and Regards
>> Dev
>>
>

Reply via email to