Hi Richard, I had tried your sample code now and several times in the past as well. The problem seems to be kafkaProducer is not serializable. so I get "Task not serializable exception" and my kafkaProducer object is created using the following jar.
group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1' On Thu, Dec 7, 2017 at 2:46 AM, Qiao, Richard <richard.q...@capitalone.com> wrote: > Thanks for sharing the code. > > The 1st problem in the first code is the map is allocated in Driver, but > it’s trying to put data in Executors, then retrieve it in driver to send to > Kafka. > > You are using this map as accumulator’s feature, but it doesn’t work in > this way. > > > > The 2nd problem is both codes are trying to collect rdd level data to > generate a single Json string then send to Kafka, which could cause very > long json string if your TPS is very high. > > If possible you can send smaller json strings at task level, such as: > > .foreachRDD(stringLongJavaPairRDD -> { > > stringLongJavaPairRDD.foreachPartition{partition ->{ > > Map<String, Long> map = new HashMap<>(); //Defined in a task > > partition.foreach(stringLongTuple2 -> { > > map.put(stringLongTuple2._1(), stringLongTuple2._2()) > > }); > > producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); > // send smaller json in a task > > } > > } > > }); > > When you do it, make sure kafka producer (seek kafka sink for it) and > gson’s environment setup correctly in executors. > > > > If after this, there is still OOM, let’s discuss further. > > > > Best Regards > > Richard > > > > > > *From: *kant kodali <kanth...@gmail.com> > *Date: *Thursday, December 7, 2017 at 2:30 AM > *To: *Gerard Maas <gerard.m...@gmail.com> > *Cc: *"Qiao, Richard" <richard.q...@capitalone.com>, "user @spark" < > user@spark.apache.org> > *Subject: *Re: Do I need to do .collect inside forEachRDD > > > > @Richard I had pasted the two versions of the code below and I still > couldn't figure out why it wouldn't work without .collect ? Any help would > be great > > > > > > *The code below doesn't work and sometime I also run into OutOfMemory > error.* > > jsonMessagesDStream > .window(*new *Duration(60000), *new *Duration(1000)) > .mapToPair(val -> { > JsonParser parser = *new *JsonParser(); > JsonObject jsonObj = parser.parse(val).getAsJsonObject(); > *if *(jsonObj.has(*"key4"*)) { > *return new *Tuple2<>(*""*, 0L); > } > String symbol = jsonObj.get(*"key1"*).getAsString(); > *long *uuantity = jsonObj.get(*"key2"*).getAsLong(); > *return new *Tuple2<>(symbol, quantity); > }) > .reduceByKey((v1, v2) -> v1 + v2) > .foreachRDD(stringLongJavaPairRDD -> { > Map<String, Long> map = *new *HashMap<>(); > stringLongJavaPairRDD.foreach(stringLongTuple2 -> { > > *System.out.println(stringLongTuple2._1()); // Works I can see > values getting printed out* > > * System.out.println(stringLongTuple2._2()); // Works I can see > values getting printed out* > > map.put(stringLongTuple2._1(), stringLongTuple2._2()) > > }); > > *System.out.println(gson.toJson(map));* // Prints empty json doc > string "{}" always. But why? especially > > // when the map is getting filled values as confirmed by the print > statements above > > producer.send(*new *ProducerRecord<>(*"topicA"*, gson.toJson(map))); > }); > > jssc.start(); > > jssc.awaitTermination(); > > > > VS > > *The below code works but it is slow because .collectAsMap* > > > > jsonMessagesDStream > .window(*new *Duration(60000), *new *Duration(1000)) > .mapToPair(val -> { > JsonParser parser = *new *JsonParser(); > JsonObject jsonObj = parser.parse(val).getAsJsonObject(); > *if *(jsonObj.has(*"key4"*)) { > *return new *Tuple2<>(*""*, 0L); > } > String symbol = jsonObj.get(*"key1"*).getAsString(); > *long *uuantity = jsonObj.get(*"key2"*).getAsLong(); > *return new *Tuple2<>(symbol, quantity); > }) > .reduceByKey((v1, v2) -> v1 + v2) > .foreachRDD(stringLongJavaPairRDD -> { > > LinkedHashMap<String, Long> map = *new > *LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap()); > > producer.send(*new *ProducerRecord<>(*"topicA"*, gson.toJson(map))); > > }); > > jssc.start(); > > jssc.awaitTermination(); > > > > > > > > On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas <gerard.m...@gmail.com> wrote: > > Hi Kant, > > > > > but would your answer on .collect() change depending on running the > spark app in client vs cluster mode? > > > > No, it should make no difference. > > > > -kr, Gerard. > > > > On Tue, Dec 5, 2017 at 11:34 PM, kant kodali <kanth...@gmail.com> wrote: > > @Richard I don't see any error in the executor log but let me run again to > make sure. > > > > @Gerard Thanks much! but would your answer on .collect() change depending > on running the spark app in client vs cluster mode? > > > > Thanks! > > > > On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > > The general answer to your initial question is that "it depends". If the > operation in the rdd.foreach() closure can be parallelized, then you don't > need to collect first. If it needs some local context (e.g. a socket > connection), then you need to do rdd.collect first to bring the data > locally, which has a perf penalty and also is restricted to the memory size > to the driver process. > > > > Given the further clarification: > > >Reads from Kafka and outputs to Kafka. so I check the output from Kafka. > > > > If it's writing to Kafka, that operation can be done in a distributed > form. > > > > You could use this lib: https://github.com/BenFradet/spark-kafka-writer > > > > Or, if you can upgrade to Spark 2.2 version, you can pave your way to > migrate to structured streaming by already adopting the 'structured' APIs > within Spark Streaming: > > > > case class KV(key: String, value: String) > > > > dstream.map().reduce().forEachRdd{rdd -> > > import spark.implicits._ > > val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to > be in a (key,value) shape > > val dataFrame = rdd.toDF() > > dataFrame.write > > .format("kafka") > > .option("kafka.bootstrap.servers", > "host1:port1,host2:port2") > > .option("topic", "topic1") > > .save() > > } > > > > -kr, Gerard. > > > > > > > > On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth...@gmail.com> wrote: > > Reads from Kafka and outputs to Kafka. so I check the output from Kafka. > > > > On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <richard.q...@capitalone.com> > wrote: > > Where do you check the output result for both case? > > Sent from my iPhone > > > > > > On Dec 5, 2017, at 15:36, kant kodali <kanth...@gmail.com> wrote: > > > > Hi All, > > > > I have a simple stateless transformation using Dstreams (stuck with the > old API for one of the Application). The pseudo code is rough like this > > > > dstream.map().reduce().forEachRdd(rdd -> { > > rdd.collect(),forEach(); // Is this necessary ? Does execute fine > but a bit slow > > }) > > > > I understand collect collects the results back to the driver but is that > necessary? can I just do something like below? I believe I tried both and > somehow the below code didn't output any results (It can be issues with my > env. I am not entirely sure) but I just would like some clarification on > .collect() since it seems to slow things down for me. > > > > dstream.map().reduce().forEachRdd(rdd -> { > > rdd.forEach(() -> {} ); // > > }) > > > > Thanks! > > > > > > ________________________________________________________ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > > > > > > > > > > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >