@Richard I had pasted the two versions of the code below and I still couldn't figure out why it wouldn't work without .collect ? Any help would be great
*The code below doesn't work and sometime I also run into OutOfMemory error.* jsonMessagesDStream .window(new Duration(60000), new Duration(1000)) .mapToPair(val -> { JsonParser parser = new JsonParser(); JsonObject jsonObj = parser.parse(val).getAsJsonObject(); if (jsonObj.has("key4")) { return new Tuple2<>("", 0L); } String symbol = jsonObj.get("key1").getAsString(); long uuantity = jsonObj.get("key2").getAsLong(); return new Tuple2<>(symbol, quantity); }) .reduceByKey((v1, v2) -> v1 + v2) .foreachRDD(stringLongJavaPairRDD -> { Map<String, Long> map = new HashMap<>(); stringLongJavaPairRDD.foreach(stringLongTuple2 -> { *System.out.println(stringLongTuple2._1()); // Works I can see values getting printed out* * System.out.println(stringLongTuple2._2()); // Works I can see values getting printed out* map.put(stringLongTuple2._1(), stringLongTuple2._2()) }); *System.out.println(gson.toJson(map));* // Prints empty json doc string "{}" always. But why? especially // when the map is getting filled values as confirmed by the print statements above producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); }); jssc.start(); jssc.awaitTermination(); VS *The below code works but it is slow because .collectAsMap* jsonMessagesDStream .window(new Duration(60000), new Duration(1000)) .mapToPair(val -> { JsonParser parser = new JsonParser(); JsonObject jsonObj = parser.parse(val).getAsJsonObject(); if (jsonObj.has("key4")) { return new Tuple2<>("", 0L); } String symbol = jsonObj.get("key1").getAsString(); long uuantity = jsonObj.get("key2").getAsLong(); return new Tuple2<>(symbol, quantity); }) .reduceByKey((v1, v2) -> v1 + v2) .foreachRDD(stringLongJavaPairRDD -> { LinkedHashMap<String, Long> map = new LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap()); producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); }); jssc.start(); jssc.awaitTermination(); On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi Kant, > > > but would your answer on .collect() change depending on running the > spark app in client vs cluster mode? > > No, it should make no difference. > > -kr, Gerard. > > On Tue, Dec 5, 2017 at 11:34 PM, kant kodali <kanth...@gmail.com> wrote: > >> @Richard I don't see any error in the executor log but let me run again >> to make sure. >> >> @Gerard Thanks much! but would your answer on .collect() change >> depending on running the spark app in client vs cluster mode? >> >> Thanks! >> >> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> The general answer to your initial question is that "it depends". If the >>> operation in the rdd.foreach() closure can be parallelized, then you don't >>> need to collect first. If it needs some local context (e.g. a socket >>> connection), then you need to do rdd.collect first to bring the data >>> locally, which has a perf penalty and also is restricted to the memory size >>> to the driver process. >>> >>> Given the further clarification: >>> >Reads from Kafka and outputs to Kafka. so I check the output from >>> Kafka. >>> >>> If it's writing to Kafka, that operation can be done in a distributed >>> form. >>> >>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer >>> >>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to >>> migrate to structured streaming by already adopting the 'structured' APIs >>> within Spark Streaming: >>> >>> case class KV(key: String, value: String) >>> >>> dstream.map().reduce().forEachRdd{rdd -> >>> import spark.implicits._ >>> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs >>> to be in a (key,value) shape >>> val dataFrame = rdd.toDF() >>> dataFrame.write >>> .format("kafka") >>> .option("kafka.bootstrap.servers", >>> "host1:port1,host2:port2") >>> .option("topic", "topic1") >>> .save() >>> } >>> >>> -kr, Gerard. >>> >>> >>> >>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka. >>>> >>>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard < >>>> richard.q...@capitalone.com> wrote: >>>> >>>>> Where do you check the output result for both case? >>>>> >>>>> Sent from my iPhone >>>>> >>>>> >>>>> > On Dec 5, 2017, at 15:36, kant kodali <kanth...@gmail.com> wrote: >>>>> > >>>>> > Hi All, >>>>> > >>>>> > I have a simple stateless transformation using Dstreams (stuck with >>>>> the old API for one of the Application). The pseudo code is rough like >>>>> this >>>>> > >>>>> > dstream.map().reduce().forEachRdd(rdd -> { >>>>> > rdd.collect(),forEach(); // Is this necessary ? Does execute >>>>> fine but a bit slow >>>>> > }) >>>>> > >>>>> > I understand collect collects the results back to the driver but is >>>>> that necessary? can I just do something like below? I believe I tried both >>>>> and somehow the below code didn't output any results (It can be issues >>>>> with >>>>> my env. I am not entirely sure) but I just would like some clarification >>>>> on >>>>> .collect() since it seems to slow things down for me. >>>>> > >>>>> > dstream.map().reduce().forEachRdd(rdd -> { >>>>> > rdd.forEach(() -> {} ); // >>>>> > }) >>>>> > >>>>> > Thanks! >>>>> > >>>>> > >>>>> ________________________________________________________ >>>>> >>>>> The information contained in this e-mail is confidential and/or >>>>> proprietary to Capital One and/or its affiliates and may only be used >>>>> solely in performance of work or services for Capital One. The information >>>>> transmitted herewith is intended only for use by the individual or entity >>>>> to which it is addressed. If the reader of this message is not the >>>>> intended >>>>> recipient, you are hereby notified that any review, retransmission, >>>>> dissemination, distribution, copying or other use of, or taking of any >>>>> action in reliance upon this information is strictly prohibited. If you >>>>> have received this communication in error, please contact the sender and >>>>> delete the material from your computer. >>>>> >>>>> >>>> >>> >> >