Hi Richard,

I had tried your sample code now and several times in the past as well. The
problem seems to be kafkaProducer is not serializable. so I get "Task not
serializable exception" and my kafkaProducer object is created using the
following jar.

group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'

On Thu, Dec 7, 2017 at 2:46 AM, Qiao, Richard <richard.q...@capitalone.com>
wrote:

> Thanks for sharing the code.
>
> The 1st problem in the first code is the map is allocated in Driver, but
> it’s trying to put data in Executors, then retrieve it in driver to send to
> Kafka.
>
> You are using this map as accumulator’s feature, but it doesn’t work in
> this way.
>
>
>
> The 2nd problem is both codes are trying to collect rdd level data to
> generate a single Json string then send to Kafka, which could cause very
> long json string if your TPS is very high.
>
> If possible you can send smaller json strings at task level, such as:
>
>     .foreachRDD(stringLongJavaPairRDD -> {
>
>       stringLongJavaPairRDD.foreachPartition{partition ->{
>
>           Map<String, Long> map = new HashMap<>(); //Defined in a task
>
>           partition.foreach(stringLongTuple2 -> {
>
>             map.put(stringLongTuple2._1(), stringLongTuple2._2())
>
>           });
>
>           producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
> // send smaller json in a task
>
>         }
>
>       }
>
>     });
>
> When you do it, make sure kafka producer (seek kafka sink for it) and
> gson’s environment setup correctly in executors.
>
>
>
> If after this, there is still OOM, let’s discuss further.
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> *From: *kant kodali <kanth...@gmail.com>
> *Date: *Thursday, December 7, 2017 at 2:30 AM
> *To: *Gerard Maas <gerard.m...@gmail.com>
> *Cc: *"Qiao, Richard" <richard.q...@capitalone.com>, "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: Do I need to do .collect inside forEachRDD
>
>
>
> @Richard I had pasted the two versions of the code below and I still
> couldn't figure out why it wouldn't work without .collect ?  Any help would
> be great
>
>
>
>
>
> *The code below doesn't work and sometime I also run into OutOfMemory
> error.*
>
> jsonMessagesDStream
>     .window(*new *Duration(60000), *new *Duration(1000))
>     .mapToPair(val -> {
>       JsonParser parser = *new *JsonParser();
>       JsonObject jsonObj = parser.parse(val).getAsJsonObject();
>       *if *(jsonObj.has(*"key4"*)) {
>         *return new *Tuple2<>(*""*, 0L);
>       }
>       String symbol = jsonObj.get(*"key1"*).getAsString();
>       *long *uuantity = jsonObj.get(*"key2"*).getAsLong();
>       *return new *Tuple2<>(symbol, quantity);
>     })
>     .reduceByKey((v1, v2) -> v1 + v2)
>     .foreachRDD(stringLongJavaPairRDD -> {
>         Map<String, Long> map = *new *HashMap<>();
>         stringLongJavaPairRDD.foreach(stringLongTuple2 -> {
>
>             *System.out.println(stringLongTuple2._1()); // Works I can see 
> values getting printed out*
>
> *            System.out.println(stringLongTuple2._2()); // Works I can see 
> values getting printed out*
>
>             map.put(stringLongTuple2._1(), stringLongTuple2._2())
>
>         });
>
>         *System.out.println(gson.toJson(map));* // Prints empty json doc 
> string "{}" always. But why? especially
>
>         // when the map is getting filled values as confirmed by the print 
> statements above
>
>         producer.send(*new *ProducerRecord<>(*"topicA"*, gson.toJson(map)));
>     });
>
>     jssc.start();
>
>     jssc.awaitTermination();
>
>
>
>                           VS
>
> *The below code works but it is slow because .collectAsMap*
>
>
>
> jsonMessagesDStream
>     .window(*new *Duration(60000), *new *Duration(1000))
>     .mapToPair(val -> {
>       JsonParser parser = *new *JsonParser();
>       JsonObject jsonObj = parser.parse(val).getAsJsonObject();
>       *if *(jsonObj.has(*"key4"*)) {
>         *return new *Tuple2<>(*""*, 0L);
>       }
>       String symbol = jsonObj.get(*"key1"*).getAsString();
>       *long *uuantity = jsonObj.get(*"key2"*).getAsLong();
>       *return new *Tuple2<>(symbol, quantity);
>     })
>     .reduceByKey((v1, v2) -> v1 + v2)
>     .foreachRDD(stringLongJavaPairRDD -> {
>
>         LinkedHashMap<String, Long> map = *new 
> *LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());
>
>         producer.send(*new *ProducerRecord<>(*"topicA"*, gson.toJson(map)));
>
>     });
>
>     jssc.start();
>
>     jssc.awaitTermination();
>
>
>
>
>
>
>
> On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas <gerard.m...@gmail.com> wrote:
>
> Hi Kant,
>
>
>
> >  but would your answer on .collect() change depending on running the
> spark app in client vs cluster mode?
>
>
>
> No, it should make no difference.
>
>
>
> -kr, Gerard.
>
>
>
> On Tue, Dec 5, 2017 at 11:34 PM, kant kodali <kanth...@gmail.com> wrote:
>
> @Richard I don't see any error in the executor log but let me run again to
> make sure.
>
>
>
> @Gerard Thanks much!  but would your answer on .collect() change depending
> on running the spark app in client vs cluster mode?
>
>
>
> Thanks!
>
>
>
> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.m...@gmail.com> wrote:
>
> The general answer to your initial question is that "it depends". If the
> operation in the rdd.foreach() closure can be parallelized, then you don't
> need to collect first. If it needs some local context (e.g. a socket
> connection), then you need to do rdd.collect first to bring the data
> locally, which has a perf penalty and also is restricted to the memory size
> to the driver process.
>
>
>
> Given the further clarification:
>
> >Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>
>
>
> If it's writing to Kafka, that operation can be done in a distributed
> form.
>
>
>
> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>
>
>
> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
> migrate to structured streaming by already adopting the 'structured' APIs
> within Spark Streaming:
>
>
>
> case class KV(key: String, value: String)
>
>
>
> dstream.map().reduce().forEachRdd{rdd ->
>
>     import spark.implicits._
>
>     val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
> be in a (key,value) shape
>
>     val dataFrame = rdd.toDF()
>
>     dataFrame.write
>
>                      .format("kafka")
>
>                      .option("kafka.bootstrap.servers",
> "host1:port1,host2:port2")
>
>                      .option("topic", "topic1")
>
>                      .save()
>
> }
>
>
>
> -kr, Gerard.
>
>
>
>
>
>
>
> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth...@gmail.com> wrote:
>
> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>
>
>
> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <richard.q...@capitalone.com>
> wrote:
>
> Where do you check the output result for both case?
>
> Sent from my iPhone
>
>
>
>
> > On Dec 5, 2017, at 15:36, kant kodali <kanth...@gmail.com> wrote:
> >
> > Hi All,
> >
> > I have a simple stateless transformation using Dstreams (stuck with the
> old API for one of the Application). The pseudo code is rough like this
> >
> > dstream.map().reduce().forEachRdd(rdd -> {
> >      rdd.collect(),forEach(); // Is this necessary ? Does execute fine
> but a bit slow
> > })
> >
> > I understand collect collects the results back to the driver but is that
> necessary? can I just do something like below? I believe I tried both and
> somehow the below code didn't output any results (It can be issues with my
> env. I am not entirely sure) but I just would like some clarification on
> .collect() since it seems to slow things down for me.
> >
> > dstream.map().reduce().forEachRdd(rdd -> {
> >      rdd.forEach(() -> {} ); //
> > })
> >
> > Thanks!
> >
> >
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>
>
>
>
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Reply via email to