Spark Dataset / Dataframe has foreachPartition() as well. Its implementation is much more efficient than RDD's. There is ton of code snippets, say https://github.com/hdinsight/spark-streaming-data-persistence-examples/blob/master/src/main/scala/com/microsoft/spark/streaming/examples/common/DataFrameExtensions.scala
On Mon, Dec 18, 2017 at 3:07 PM, Liana Napalkova < liana.napalk...@eurecat.org> wrote: > I need to firstly read from Kafka queue into a DataFrame. Then I should > perform some transformations with the data. Finally, for each row in the > DataFrame I should conditionally apply KafkaProducer in order to send some > data to Kafka. > > So, I am both consuming and producing the data from/to Kafka. > > > > ------------------------------ > *From:* Silvio Fiorito <silvio.fior...@granturing.com> > *Sent:* 18 December 2017 16:00:39 > *To:* Liana Napalkova; user@spark.apache.org > *Subject:* Re: How to properly execute `foreachPartition` in Spark 2.2 > > > Why don’t you just use the Kafka sink for Spark 2.2? > > > > https://spark.apache.org/docs/2.2.0/structured-streaming- > kafka-integration.html#creating-a-kafka-sink-for-streaming-queries > > > > > > > > *From: *Liana Napalkova <liana.napalk...@eurecat.org> > *Date: *Monday, December 18, 2017 at 9:45 AM > *To: *"user@spark.apache.org" <user@spark.apache.org> > *Subject: *How to properly execute `foreachPartition` in Spark 2.2 > > > > Hi, > > > > I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I > explain the problem is details. I appreciate any help. > > > > In Spark 1.6 I was doing something similar to this: > > > > DstreamFromKafka.foreachRDD(session => { > session.foreachPartition { partitionOfRecords => > println("Setting the producer.") > val producer = Utils.createProducer(mySet. > value("metadataBrokerList"), > > mySet.value("batchSize"), > > mySet.value("lingerMS")) > partitionOfRecords.foreach(s => { > > //... > > > > However, I cannot find the proper way to do the similar thing in Spark > 2.2. I tried to write my own class by extending `ForeachWriter`, but I get > Task Serialization error when passing `KafkaProducer`. > > *class *MyTestClass( > // *val inputparams*: String) > *extends *Serializable > { > > *val **spark *= SparkSession > .*builder*() > .appName("TEST") > //.config("spark.sql.warehouse.dir", kafkaData) > .enableHiveSupport() > .getOrCreate() > > *import **spark*.implicits._ > > *val *df: Dataset[String] = *spark*.readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "test") > .option("startingOffsets", "latest") > .option("failOnDataLoss", "true") > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, > String)] // Kafka sends bytes > .map(_._2) > > *val *producer = // create KafkaProducer > > *val *writer = *new *MyForeachWriter(producer: KafkaProducer[String,String]) > > *val *query = df > .writeStream > .foreach(writer) > .start > > query.awaitTermination() > > *spark*.stop() > > > *class *MyForeachWriter *extends *ForeachWriter[String] *with *Serializable { > > *var **producer*: KafkaProducer[String,String] = _ > > *def this*(producer: KafkaProducer[String,String]) > { > *this*() > *this*.*producer *= producer > } > > *override def *process(row: String): Unit = > { > // ... > } > > *override def *close(errorOrNull: Throwable): Unit = {} > > *override def *open(partitionId: Long, version: Long): Boolean = { > > *true *} > > } > > > > > > *Liana Napalkova, PhD* > > *Big Data Analytics Unit* > * ------------------------------ * > > > > > > *T +34 **93 238 14 00 (ext. 1248)* > *M +34 **633 426 677* > > *liana.napalk...@eurecat.org <liana.napalk...@eurecat.org>* > ------------------------------ > > Carrer Camí Antic de València 54 > <https://maps.google.com/?q=Cam%C3%AD+Antic+de+Val%C3%A8ncia+54&entry=gmail&source=g>-56, > Edifici A - 08005 - Barcelona > www.eurecat.org > > Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat > > > ------------------------------ > DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè > no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber > immediatament a la següent adreça: le...@eurecat.org Si el destinatari > d'aquest missatge no consent la utilització del correu electrònic via > Internet i la gravació de missatges, li preguem que ens ho comuniqui > immediatament. > > DISCLAIMER: Este mensaje puede contener información confidencial. Si usted > no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo > inmediatamente a la siguiente dirección: le...@eurecat.org Si el > destinatario de este mensaje no consintiera la utilización del correo > electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga > en nuestro conocimiento de forma inmediata. > > DISCLAIMER: Privileged/Confidential Information may be contained in this > message. If you are not the addressee indicated in this message you should > destroy this message, and notify us immediately to the following address: > le...@eurecat.org. If the addressee of this message does not consent to > the use of Internet e-mail and message recording, please notify us > immediately. > ------------------------------ > > > ------------------------------ > DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè > no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber > immediatament a la següent adreça: le...@eurecat.org Si el destinatari > d'aquest missatge no consent la utilització del correu electrònic via > Internet i la gravació de missatges, li preguem que ens ho comuniqui > immediatament. > > DISCLAIMER: Este mensaje puede contener información confidencial. Si usted > no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo > inmediatamente a la siguiente dirección: le...@eurecat.org Si el > destinatario de este mensaje no consintiera la utilización del correo > electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga > en nuestro conocimiento de forma inmediata. > > DISCLAIMER: Privileged/Confidential Information may be contained in this > message. If you are not the addressee indicated in this message you should > destroy this message, and notify us immediately to the following address: > le...@eurecat.org. If the addressee of this message does not consent to > the use of Internet e-mail and message recording, please notify us > immediately. > ------------------------------ > > >