@Richard I don't see any error in the executor log but let me run again to make sure.
@Gerard Thanks much! but would your answer on .collect() change depending on running the spark app in client vs cluster mode? Thanks! On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > The general answer to your initial question is that "it depends". If the > operation in the rdd.foreach() closure can be parallelized, then you don't > need to collect first. If it needs some local context (e.g. a socket > connection), then you need to do rdd.collect first to bring the data > locally, which has a perf penalty and also is restricted to the memory size > to the driver process. > > Given the further clarification: > >Reads from Kafka and outputs to Kafka. so I check the output from Kafka. > > If it's writing to Kafka, that operation can be done in a distributed > form. > > You could use this lib: https://github.com/BenFradet/spark-kafka-writer > > Or, if you can upgrade to Spark 2.2 version, you can pave your way to > migrate to structured streaming by already adopting the 'structured' APIs > within Spark Streaming: > > case class KV(key: String, value: String) > > dstream.map().reduce().forEachRdd{rdd -> > import spark.implicits._ > val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to > be in a (key,value) shape > val dataFrame = rdd.toDF() > dataFrame.write > .format("kafka") > .option("kafka.bootstrap.servers", > "host1:port1,host2:port2") > .option("topic", "topic1") > .save() > } > > -kr, Gerard. > > > > On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth...@gmail.com> wrote: > >> Reads from Kafka and outputs to Kafka. so I check the output from Kafka. >> >> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard < >> richard.q...@capitalone.com> wrote: >> >>> Where do you check the output result for both case? >>> >>> Sent from my iPhone >>> >>> >>> > On Dec 5, 2017, at 15:36, kant kodali <kanth...@gmail.com> wrote: >>> > >>> > Hi All, >>> > >>> > I have a simple stateless transformation using Dstreams (stuck with >>> the old API for one of the Application). The pseudo code is rough like this >>> > >>> > dstream.map().reduce().forEachRdd(rdd -> { >>> > rdd.collect(),forEach(); // Is this necessary ? Does execute fine >>> but a bit slow >>> > }) >>> > >>> > I understand collect collects the results back to the driver but is >>> that necessary? can I just do something like below? I believe I tried both >>> and somehow the below code didn't output any results (It can be issues with >>> my env. I am not entirely sure) but I just would like some clarification on >>> .collect() since it seems to slow things down for me. >>> > >>> > dstream.map().reduce().forEachRdd(rdd -> { >>> > rdd.forEach(() -> {} ); // >>> > }) >>> > >>> > Thanks! >>> > >>> > >>> ________________________________________________________ >>> >>> The information contained in this e-mail is confidential and/or >>> proprietary to Capital One and/or its affiliates and may only be used >>> solely in performance of work or services for Capital One. The information >>> transmitted herewith is intended only for use by the individual or entity >>> to which it is addressed. If the reader of this message is not the intended >>> recipient, you are hereby notified that any review, retransmission, >>> dissemination, distribution, copying or other use of, or taking of any >>> action in reliance upon this information is strictly prohibited. If you >>> have received this communication in error, please contact the sender and >>> delete the material from your computer. >>> >>> >> >