Dear fellow Spark Users, My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) listens to Campaigns based on live stock feeds and the batch duration is 5 seconds. The applications uses Kafka DirectStream and based on the feed source there are three streams. As given in the code snippet I am doing a union of three streams and I am trying to remove the duplicate campaigns received using reduceByKey based on the customer and campaignId. I could see lot of duplicate email being send out for the same key in the same batch.I was expecting reduceByKey to remove the duplicate campaigns in a batch based on customer and campaignId. In logs I am even printing the the key,batch time before sending the email and I could clearly see duplicates. I could see some duplicates getting removed after adding log in reduceByKey Function, but its not eliminating completely .
JavaDStream<Campaign> matchedCampaigns = stream1.transform(CmpManager::getMatchedCampaigns) .union(stream2).union(stream3).cache(); JavaPairDStream<String, Campaign> uniqueCampaigns = matchedCampaigns.mapToPair(campaign->{ String key=campaign.getCustomer()+"_"+campaign.getId(); return new Tuple2<String, Campaigns>(key, campaign); }).reduceByKey((campaign1, campaign2)->{return campaign1;}); uniqueCampaigns.foreachRDD(CmpManager::sendEmail); I am not able to figure out where I am going wrong here . Please help me here to get rid of this weird problem. Previously we were using createStream for listening to Kafka Queue (number of partitions 1) , there we didn't face this issue. But when we moved to directStream (number of partitions 100) we could easily reproduce this issue on high load . *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds instead of reduceByKey Operation, But even that didn't help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), Durations.Seconds(5)) I have even requested for help on Stackoverflow , But I haven't received any solutions to this issue. *Stack Overflow Link================* https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch Thanks and Regards Dev