Hi, I have applied mapToPair and then a reduceByKey on a DStream to obtain a JavaPairDStream<String, Map<String, Object>>. I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained above. But i do not see any logs from reduceByKey operation. Can anyone explain why is this happening..?
find My Code Below - * /*** * GroupLevel1 Groups - articleId, host and tags */* JavaPairDStream<String, Map<String, Object>> groupLevel1 = inputDataMap .mapToPair( new PairFunction<Map<String, Object>, String, Map<String, Object>>() { private static final long serialVersionUID = 5196132687044875422L; @Override public Tuple2<String, Map<String, Object>> call( Map<String, Object> map) throws Exception { String host = (String) map.get("host"); String articleId = (String) map.get("articleId"); List tags = (List) map.get("tags"); if (host == null || articleId == null) { logger.error("*********** Error Doc ************\n" + map); } String key = "articleId_" + articleId + "_host_" + host + "_tags_" + tags.toString(); // logger.info(key); System.out.println("Printing Key - " + key); map.put("articlecount", 1L); return new Tuple2<String, Map<String, Object>>(key, map); } }) .reduceByKey( new Function2<Map<String, Object>, Map<String, Object>, Map<String, Object>>() { private static final long serialVersionUID = 1L; @Override public Map<String, Object> call( Map<String, Object> map1, Map<String, Object> map2) throws Exception { Long count1 = (Long) map1.get("articlecount"); Long count2 = (Long) map2.get("articlecount"); map1.put("articlecount", count1 + count2); return map1; } }); * /*** * Grouping level 1 groups on articleId+host+tags * Tags can be multiple for an article. * Grouping level 2 does - * 1. For each tag in a row, find occurrence of that tag in other rows. * 2. If one tag found in another row, then add the articleCount of current and new row and put as articleCount for that tag. * Note - * Idea behind this grouping is to get all article counts that contain a particular tag and preserve this value. */* JavaPairDStream<String, Map<String, Object>> groupLevel2 = groupLevel1.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Map<String, Object>>, String, Map<String, Object>>() { @Override public Iterable<Tuple2<String, Map<String, Object>>> call(Tuple2<String, Map<String, Object>> stringMapTuple2) throws Exception { System.out.println("group level 2 tuple 1 -" + stringMapTuple2._1()); System.out.println("group level 2 tuple 2 -" + stringMapTuple2._2()); ArrayList<String> tagList = (ArrayList<String>) stringMapTuple2._2().get("tags"); ArrayList tagKeyList = new ArrayList(); String host = (String) stringMapTuple2._2().get("host"); StringBuilder key; for (String tag : tagList) { key = new StringBuilder("host_").append(host).append("_tag_").append(tag); System.out.println("generated Key - "+key); tagKeyList.add(new Tuple2<String, Map<String, Object>>(key.toString(), stringMapTuple2._2())); } return tagKeyList; } }); groupLevel2 = groupLevel2.reduceByKey(new Function2<Map<String, Object>, Map<String, Object>, Map<String, Object>>() { @Override public Map<String, Object> call(Map<String, Object> dataMap1, Map<String, Object> dataMap2) throws Exception { System.out.println("Type of article map in 1 " + dataMap1.get("articleId").getClass()); System.out.println("Type of article map in 2 " + dataMap2.get("articleId").getClass()); Map<String, String> articleMap1 = (Map<String, String>) dataMap1.get("articleId"); Map<String, String> articleMap2 = (Map<String, String>) dataMap2.get("articleId"); if (articleMap1 == null || articleMap1.isEmpty()) { System.out.println("returning because map 1 null"); return dataMap2; } if (articleMap2 == null || articleMap2.isEmpty()) { System.out.println("returning because map 2 null"); return dataMap1; } for (String articleId : articleMap1.keySet()) { if (articleMap2.containsKey(articleId)) { articleMap2.put(articleId, articleMap1.get(articleId) + articleMap2.get(articleId)); } else { articleMap2.put(articleId, articleMap1.get(articleId)); } } System.out.println("putting back new map" + dataMap2.put("articleId", articleMap2)); dataMap2.put("Hello","hello"); return dataMap2; } }); */*** In logs, i am not able to see "Hello","Hello" entry in my DSTream when i display the data using forEach on DStream. */* Regards, Deepesh