@Richard I had pasted the two versions of the code below and I still
couldn't figure out why it wouldn't work without .collect ?  Any help would
be great


*The code below doesn't work and sometime I also run into OutOfMemory
error.*

jsonMessagesDStream
    .window(new Duration(60000), new Duration(1000))
    .mapToPair(val -> {
      JsonParser parser = new JsonParser();
      JsonObject jsonObj = parser.parse(val).getAsJsonObject();
      if (jsonObj.has("key4")) {
        return new Tuple2<>("", 0L);
      }
      String symbol = jsonObj.get("key1").getAsString();
      long uuantity = jsonObj.get("key2").getAsLong();
      return new Tuple2<>(symbol, quantity);
    })
    .reduceByKey((v1, v2) -> v1 + v2)
    .foreachRDD(stringLongJavaPairRDD -> {
        Map<String, Long> map = new HashMap<>();
        stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

            *System.out.println(stringLongTuple2._1()); // Works I can
see values getting printed out*

*            System.out.println(stringLongTuple2._2()); // Works I can
see values getting printed out*

            map.put(stringLongTuple2._1(), stringLongTuple2._2())

        });

        *System.out.println(gson.toJson(map));* // Prints empty json
doc string "{}" always. But why? especially

        // when the map is getting filled values as confirmed by the
print statements above

        producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
    });

    jssc.start();

    jssc.awaitTermination();


                          VS

*The below code works but it is slow because .collectAsMap*


jsonMessagesDStream
    .window(new Duration(60000), new Duration(1000))
    .mapToPair(val -> {
      JsonParser parser = new JsonParser();
      JsonObject jsonObj = parser.parse(val).getAsJsonObject();
      if (jsonObj.has("key4")) {
        return new Tuple2<>("", 0L);
      }
      String symbol = jsonObj.get("key1").getAsString();
      long uuantity = jsonObj.get("key2").getAsLong();
      return new Tuple2<>(symbol, quantity);
    })
    .reduceByKey((v1, v2) -> v1 + v2)
    .foreachRDD(stringLongJavaPairRDD -> {

        LinkedHashMap<String, Long> map = new
LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

        producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

    });

    jssc.start();

    jssc.awaitTermination();




On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi Kant,
>
> >  but would your answer on .collect() change depending on running the
> spark app in client vs cluster mode?
>
> No, it should make no difference.
>
> -kr, Gerard.
>
> On Tue, Dec 5, 2017 at 11:34 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> @Richard I don't see any error in the executor log but let me run again
>> to make sure.
>>
>> @Gerard Thanks much!  but would your answer on .collect() change
>> depending on running the spark app in client vs cluster mode?
>>
>> Thanks!
>>
>> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> The general answer to your initial question is that "it depends". If the
>>> operation in the rdd.foreach() closure can be parallelized, then you don't
>>> need to collect first. If it needs some local context (e.g. a socket
>>> connection), then you need to do rdd.collect first to bring the data
>>> locally, which has a perf penalty and also is restricted to the memory size
>>> to the driver process.
>>>
>>> Given the further clarification:
>>> >Reads from Kafka and outputs to Kafka. so I check the output from
>>> Kafka.
>>>
>>> If it's writing to Kafka, that operation can be done in a distributed
>>> form.
>>>
>>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>>
>>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>>> migrate to structured streaming by already adopting the 'structured' APIs
>>> within Spark Streaming:
>>>
>>> case class KV(key: String, value: String)
>>>
>>> dstream.map().reduce().forEachRdd{rdd ->
>>>     import spark.implicits._
>>>     val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs
>>> to be in a (key,value) shape
>>>     val dataFrame = rdd.toDF()
>>>     dataFrame.write
>>>                      .format("kafka")
>>>                      .option("kafka.bootstrap.servers",
>>> "host1:port1,host2:port2")
>>>                      .option("topic", "topic1")
>>>                      .save()
>>> }
>>>
>>> -kr, Gerard.
>>>
>>>
>>>
>>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>>>
>>>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
>>>> richard.q...@capitalone.com> wrote:
>>>>
>>>>> Where do you check the output result for both case?
>>>>>
>>>>> Sent from my iPhone
>>>>>
>>>>>
>>>>> > On Dec 5, 2017, at 15:36, kant kodali <kanth...@gmail.com> wrote:
>>>>> >
>>>>> > Hi All,
>>>>> >
>>>>> > I have a simple stateless transformation using Dstreams (stuck with
>>>>> the old API for one of the Application). The pseudo code is rough like 
>>>>> this
>>>>> >
>>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>>> >      rdd.collect(),forEach(); // Is this necessary ? Does execute
>>>>> fine but a bit slow
>>>>> > })
>>>>> >
>>>>> > I understand collect collects the results back to the driver but is
>>>>> that necessary? can I just do something like below? I believe I tried both
>>>>> and somehow the below code didn't output any results (It can be issues 
>>>>> with
>>>>> my env. I am not entirely sure) but I just would like some clarification 
>>>>> on
>>>>> .collect() since it seems to slow things down for me.
>>>>> >
>>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>>> >      rdd.forEach(() -> {} ); //
>>>>> > })
>>>>> >
>>>>> > Thanks!
>>>>> >
>>>>> >
>>>>> ________________________________________________________
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the 
>>>>> intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to