I don't see that you invoke any action in this code. It won't do
anything unless you tell it to perform an action that requires the
transformations.

On Wed, Aug 26, 2015 at 7:05 AM, Deepesh Maheshwari
<deepesh.maheshwar...@gmail.com> wrote:
> Hi,
> I have applied mapToPair and then a reduceByKey on a DStream to obtain a
> JavaPairDStream<String, Map<String, Object>>.
> I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
> above.
> But i do not see any logs from reduceByKey operation.
> Can anyone explain why is this happening..?
>
>
> find My Code Below -
>
>  /***
>          * GroupLevel1 Groups - articleId, host and tags
>          */
>         JavaPairDStream<String, Map<String, Object>> groupLevel1 =
> inputDataMap
>
>                 .mapToPair(
>                         new PairFunction<Map<String, Object>, String,
> Map<String, Object>>() {
>
>                             private static final long serialVersionUID =
> 5196132687044875422L;
>
>                             @Override
>                             public Tuple2<String, Map<String, Object>> call(
>                                     Map<String, Object> map) throws
> Exception {
>                                 String host = (String) map.get("host");
>                                 String articleId = (String)
> map.get("articleId");
>                                 List tags = (List) map.get("tags");
>
>                                 if (host == null || articleId == null) {
>                                     logger.error("*********** Error Doc
> ************\n" + map);
>                                 }
>                                 String key = "articleId_" + articleId +
> "_host_" + host + "_tags_" + tags.toString();
>
> //                                logger.info(key);
>                                 System.out.println("Printing Key - " + key);
>                                 map.put("articlecount", 1L);
>
>                                 return new Tuple2<String, Map<String,
> Object>>(key, map);
>                             }
>                         })
>                 .reduceByKey(
>                         new Function2<Map<String, Object>, Map<String,
> Object>, Map<String, Object>>() {
>
>                             private static final long serialVersionUID = 1L;
>
>                             @Override
>                             public Map<String, Object> call(
>                                     Map<String, Object> map1,
>                                     Map<String, Object> map2) throws
> Exception {
>                                 Long count1 = (Long)
> map1.get("articlecount");
>                                 Long count2 = (Long)
> map2.get("articlecount");
>
>                                 map1.put("articlecount", count1 + count2);
>                                 return map1;
>                             }
>                         });
>
>
>
>
>
>         /***
>          * Grouping level 1 groups on articleId+host+tags
>          * Tags can be multiple for an article.
>          * Grouping level 2 does -
>          *  1. For each tag in a row, find occurrence of that tag in other
> rows.
>          *  2. If one tag found in another row, then add the articleCount of
> current and new row and put as articleCount for that tag.
>          *  Note -
>          *      Idea behind this grouping is to get all article counts that
> contain a particular tag and preserve this value.
>          */
>
>
>         JavaPairDStream<String, Map<String, Object>> groupLevel2 =
> groupLevel1.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Map<String,
> Object>>, String, Map<String, Object>>() {
>             @Override
>             public Iterable<Tuple2<String, Map<String, Object>>>
> call(Tuple2<String, Map<String, Object>> stringMapTuple2) throws Exception {
>                 System.out.println("group level 2 tuple 1 -" +
> stringMapTuple2._1());
>                 System.out.println("group level 2 tuple 2 -" +
> stringMapTuple2._2());
>                 ArrayList<String> tagList = (ArrayList<String>)
> stringMapTuple2._2().get("tags");
>                 ArrayList tagKeyList = new ArrayList();
>                 String host = (String) stringMapTuple2._2().get("host");
>                 StringBuilder key;
>                 for (String tag : tagList) {
>                     key = new
> StringBuilder("host_").append(host).append("_tag_").append(tag);
>                     System.out.println("generated Key - "+key);
>                     tagKeyList.add(new Tuple2<String, Map<String,
> Object>>(key.toString(), stringMapTuple2._2()));
>                 }
>                 return tagKeyList;
>             }
>         });
>
>         groupLevel2 = groupLevel2.reduceByKey(new Function2<Map<String,
> Object>, Map<String, Object>, Map<String, Object>>() {
>             @Override
>             public Map<String, Object> call(Map<String, Object> dataMap1,
> Map<String, Object> dataMap2) throws Exception {
>                 System.out.println("Type of article map in 1 " +
> dataMap1.get("articleId").getClass());
>                 System.out.println("Type of article map in 2 " +
> dataMap2.get("articleId").getClass());
>                 Map<String, String> articleMap1 = (Map<String, String>)
> dataMap1.get("articleId");
>                 Map<String, String> articleMap2 = (Map<String, String>)
> dataMap2.get("articleId");
>
>                 if (articleMap1 == null || articleMap1.isEmpty()) {
>                     System.out.println("returning because map 1 null");
>                     return dataMap2;
>                 }
>
>                 if (articleMap2 == null || articleMap2.isEmpty()) {
>                     System.out.println("returning because map 2 null");
>                     return dataMap1;
>                 }
>                 for (String articleId : articleMap1.keySet()) {
>                     if (articleMap2.containsKey(articleId)) {
>                         articleMap2.put(articleId,
> articleMap1.get(articleId) + articleMap2.get(articleId));
>                     } else {
>                         articleMap2.put(articleId,
> articleMap1.get(articleId));
>                     }
>                 }
>                 System.out.println("putting back new map" +
> dataMap2.put("articleId", articleMap2));
>                 dataMap2.put("Hello","hello");
>                 return dataMap2;
>             }
>         });
>
>
>
>
> /***
> In logs, i am not able to see "Hello","Hello" entry in my DSTream when i
> display the data using forEach on DStream.
> */
>
>
>
> Regards,
>
> Deepesh
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to