There are no failures or errors. Irrespective of that I am seeing duplicates. The steps and stages are all successful and even the speculation is turned off .
On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> wrote: > Are you certain you aren't getting any failed tasks or other errors? > Output actions like foreach aren't exactly once and will be retried on > failures. > > On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote: > >> Dear fellow Spark Users, >> >> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) >> listens to Campaigns based on live stock feeds and the batch duration is 5 >> seconds. The applications uses Kafka DirectStream and based on the feed >> source there are three streams. As given in the code snippet I am doing a >> union of three streams and I am trying to remove the duplicate campaigns >> received using reduceByKey based on the customer and campaignId. I could >> see lot of duplicate email being send out for the same key in the same >> batch.I was expecting reduceByKey to remove the duplicate campaigns in a >> batch based on customer and campaignId. In logs I am even printing the the >> key,batch time before sending the email and I could clearly see duplicates. >> I could see some duplicates getting removed after adding log in reduceByKey >> Function, but its not eliminating completely . >> >> JavaDStream<Campaign> matchedCampaigns = >> stream1.transform(CmpManager::getMatchedCampaigns) >> .union(stream2).union(stream3).cache(); >> JavaPairDStream<String, Campaign> uniqueCampaigns = >> matchedCampaigns.mapToPair(campaign->{ >> String key=campaign.getCustomer()+"_"+campaign.getId(); >> return new Tuple2<String, Campaigns>(key, campaign); >> }).reduceByKey((campaign1, campaign2)->{return campaign1;}); >> >> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); >> >> I am not able to figure out where I am going wrong here . Please help me >> here to get rid of this weird problem. Previously we were using >> createStream for listening to Kafka Queue (number of partitions 1) , there >> we didn't face this issue. But when we moved to directStream (number of >> partitions 100) we could easily reproduce this issue on high load . >> >> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds >> instead of reduceByKey Operation, But even that didn't help. >> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), >> Durations.Seconds(5)) >> I have even requested for help on Stackoverflow , But I haven't received >> any solutions to this issue. >> >> >> *Stack Overflow Link================* >> https://stackoverflow.com/questions/40559858/spark-streaming >> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch >> >> >> Thanks and Regards >> Dev >> >