@Richard I had pasted the two versions of the code below and I still
couldn't figure out why it wouldn't work without .collect ?  Any help would
be great

*The code below doesn't work and sometime I also run into OutOfMemory

    .window(new Duration(60000), new Duration(1000))
    .mapToPair(val -> {
      JsonParser parser = new JsonParser();
      JsonObject jsonObj = parser.parse(val).getAsJsonObject();
      if (jsonObj.has("key4")) {
        return new Tuple2<>("", 0L);
      String symbol = jsonObj.get("key1").getAsString();
      long uuantity = jsonObj.get("key2").getAsLong();
      return new Tuple2<>(symbol, quantity);
    .reduceByKey((v1, v2) -> v1 + v2)
    .foreachRDD(stringLongJavaPairRDD -> {
        Map<String, Long> map = new HashMap<>();
        stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

            *System.out.println(stringLongTuple2._1()); // Works I can
see values getting printed out*

*            System.out.println(stringLongTuple2._2()); // Works I can
see values getting printed out*

            map.put(stringLongTuple2._1(), stringLongTuple2._2())


        *System.out.println(gson.toJson(map));* // Prints empty json
doc string "{}" always. But why? especially

        // when the map is getting filled values as confirmed by the
print statements above

        producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));




*The below code works but it is slow because .collectAsMap*

    .window(new Duration(60000), new Duration(1000))
    .mapToPair(val -> {
      JsonParser parser = new JsonParser();
      JsonObject jsonObj = parser.parse(val).getAsJsonObject();
      if (jsonObj.has("key4")) {
        return new Tuple2<>("", 0L);
      String symbol = jsonObj.get("key1").getAsString();
      long uuantity = jsonObj.get("key2").getAsLong();
      return new Tuple2<>(symbol, quantity);
    .reduceByKey((v1, v2) -> v1 + v2)
    .foreachRDD(stringLongJavaPairRDD -> {

        LinkedHashMap<String, Long> map = new

        producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));




On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi Kant,
> >  but would your answer on .collect() change depending on running the
> spark app in client vs cluster mode?
> No, it should make no difference.
> -kr, Gerard.
On Tue, Dec 5, 2017 at 11:34 PM, kant kodali <kanth...@gmail.com> wrote:
>> @Richard I don't see any error in the executor log but let me run again
>> to make sure.
>> @Gerard Thanks much!  but would your answer on .collect() change
>> depending on running the spark app in client vs cluster mode?
>> Thanks!
On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.m...@gmail.com>
wrote:
>> wrote:
>>> The general answer to your initial question is that "it depends". If the
>>> operation in the rdd.foreach() closure can be parallelized, then you don't
>>> need to collect first. If it needs some local context (e.g. a socket
>>> connection), then you need to do rdd.collect first to bring the data
>>> locally, which has a perf penalty and also is restricted to the memory size
>>> to the driver process.
>>> Given the further clarification:
>>> >Reads from Kafka and outputs to Kafka. so I check the output from
>>> Kafka.
>>> If it's writing to Kafka, that operation can be done in a distributed
>>> form.
>>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>>> migrate to structured streaming by already adopting the 'structured' APIs
>>> within Spark Streaming:
>>> case class KV(key: String, value: String)
>>> dstream.map().reduce().forEachRdd{rdd ->
>>>     import spark.implicits._
>>>     val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs
>>> to be in a (key,value) shape
>>>     val dataFrame = rdd.toDF()
>>>     dataFrame.write
>>>                      .format("kafka")
>>>                      .option("kafka.bootstrap.servers",
>>> "host1:port1,host2:port2")
>>>                      .option("topic", "topic1")
>>>                      .save()
>>> }
>>> -kr, Gerard.
On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth...@gmail.com> wrote:
>>>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
>>>> richard.q...@capitalone.com> wrote:
>>>>> > On Dec 5, 2017, at 15:36, kant kodali <kanth...@gmail.com> wrote:
>>>>> >
>>>>> > Hi All,
>>>>> >
>>>>> > I have a simple stateless transformation using Dstreams (stuck with
>>>>> the old API for one of the Application). The pseudo code is rough like 
>>>>> this
>>>>> >
>>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>>> >      rdd.collect(),forEach(); // Is this necessary ? Does execute
>>>>> fine but a bit slow
>>>>> > })
>>>>> >
>>>>> > I understand collect collects the results back to the driver but is
>>>>> that necessary? can I just do something like below? I believe I tried both
>>>>> and somehow the below code didn't output any results (It can be issues 
>>>>> with
>>>>> my env. I am not entirely sure) but I just would like some clarification 
>>>>> on
>>>>> .collect() since it seems to slow things down for me.
>>>>> >
>>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>>> >      rdd.forEach(() -> {} ); //
>>>>> > })
>>>>> >
>>>>> > Thanks!
>>>>> >
>>>>> >
