Hi,
I have applied mapToPair and then a reduceByKey on a DStream to obtain a
JavaPairDStream<String, Map<String, Object>>.
I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
above.
But i do not see any logs from reduceByKey operation.
Can anyone explain why is this happening..?


find My Code Below -



* /***           * GroupLevel1 Groups - articleId, host and tags
 */*
        JavaPairDStream<String, Map<String, Object>> groupLevel1 =
inputDataMap

                .mapToPair(
                        new PairFunction<Map<String, Object>, String,
Map<String, Object>>() {

                            private static final long serialVersionUID =
5196132687044875422L;

                            @Override
                            public Tuple2<String, Map<String, Object>> call(

                                    Map<String, Object> map) throws
Exception {
                                String host = (String) map.get("host");
                                String articleId = (String)
map.get("articleId");
                                List tags = (List) map.get("tags");

                                if (host == null || articleId == null) {
                                    logger.error("*********** Error Doc
************\n" + map);
                                }
                                String key = "articleId_" + articleId +
"_host_" + host + "_tags_" + tags.toString();

//                                logger.info(key);
                                System.out.println("Printing Key - " + key);

                                map.put("articlecount", 1L);

                                return new Tuple2<String, Map<String,
Object>>(key, map);
                            }
                        })
                .reduceByKey(
                        new Function2<Map<String, Object>, Map<String,
Object>, Map<String, Object>>() {

                            private static final long serialVersionUID = 1L;


                            @Override
                            public Map<String, Object> call(
                                    Map<String, Object> map1,
                                    Map<String, Object> map2) throws
Exception {
                                Long count1 = (Long)
map1.get("articlecount");
                                Long count2 = (Long)
map2.get("articlecount");

                                map1.put("articlecount", count1 + count2);
                                return map1;
                            }
                        });













*        /***           * Grouping level 1 groups on articleId+host+tags
       * Tags can be multiple for an article.           * Grouping level 2
does -           *  1. For each tag in a row, find occurrence of that tag
in other rows.           *  2. If one tag found in another row, then add
the articleCount of current and new row and put as articleCount for that
tag.           *  Note -           *      Idea behind this grouping is to
get all article counts that contain a particular tag and preserve this
value.           */*


        JavaPairDStream<String, Map<String, Object>> groupLevel2 =
groupLevel1.flatMapToPair(new PairFlatMapFunction<Tuple2<String,
Map<String, Object>>, String, Map<String, Object>>() {
            @Override
            public Iterable<Tuple2<String, Map<String, Object>>>
call(Tuple2<String, Map<String, Object>> stringMapTuple2) throws Exception {

                System.out.println("group level 2 tuple 1 -" +
stringMapTuple2._1());
                System.out.println("group level 2 tuple 2 -" +
stringMapTuple2._2());
                ArrayList<String> tagList = (ArrayList<String>)
stringMapTuple2._2().get("tags");
                ArrayList tagKeyList = new ArrayList();
                String host = (String) stringMapTuple2._2().get("host");
                StringBuilder key;
                for (String tag : tagList) {
                    key = new
StringBuilder("host_").append(host).append("_tag_").append(tag);
                    System.out.println("generated Key - "+key);
                    tagKeyList.add(new Tuple2<String, Map<String,
Object>>(key.toString(), stringMapTuple2._2()));
                }
                return tagKeyList;
            }
        });

        groupLevel2 = groupLevel2.reduceByKey(new Function2<Map<String,
Object>, Map<String, Object>, Map<String, Object>>() {
            @Override
            public Map<String, Object> call(Map<String, Object> dataMap1,
Map<String, Object> dataMap2) throws Exception {
                System.out.println("Type of article map in 1 " +
dataMap1.get("articleId").getClass());
                System.out.println("Type of article map in 2 " +
dataMap2.get("articleId").getClass());
                Map<String, String> articleMap1 = (Map<String, String>)
dataMap1.get("articleId");
                Map<String, String> articleMap2 = (Map<String, String>)
dataMap2.get("articleId");

                if (articleMap1 == null || articleMap1.isEmpty()) {
                    System.out.println("returning because map 1 null");
                    return dataMap2;
                }

                if (articleMap2 == null || articleMap2.isEmpty()) {
                    System.out.println("returning because map 2 null");
                    return dataMap1;
                }
                for (String articleId : articleMap1.keySet()) {
                    if (articleMap2.containsKey(articleId)) {
                        articleMap2.put(articleId,
articleMap1.get(articleId) + articleMap2.get(articleId));
                    } else {
                        articleMap2.put(articleId,
articleMap1.get(articleId));
                    }
                }
                System.out.println("putting back new map" +
dataMap2.put("articleId", articleMap2));
                dataMap2.put("Hello","hello");
                return dataMap2;
            }
        });






*/***  In logs, i am not able to see "Hello","Hello" entry in my DSTream
when i display the data using forEach on DStream.  */*



Regards,

Deepesh

Reply via email to