Spark Dataset / Dataframe has foreachPartition() as well. Its
implementation is much more efficient than RDD's.
There is ton of code snippets, say
https://github.com/hdinsight/spark-streaming-data-persistence-examples/blob/master/src/main/scala/com/microsoft/spark/streaming/examples/common/DataFrameExtensions.scala

On Mon, Dec 18, 2017 at 3:07 PM, Liana Napalkova <
liana.napalk...@eurecat.org> wrote:

> I need to firstly read from Kafka queue into a DataFrame. Then I should
> perform some transformations with the data. Finally, for each row in the
> DataFrame I should conditionally apply KafkaProducer in order to send some
> data to Kafka.
>
> So, I am both consuming and producing the data from/to Kafka.
>
>
>
> ------------------------------
> *From:* Silvio Fiorito <silvio.fior...@granturing.com>
> *Sent:* 18 December 2017 16:00:39
> *To:* Liana Napalkova; user@spark.apache.org
> *Subject:* Re: How to properly execute `foreachPartition` in Spark 2.2
>
>
> Why don’t you just use the Kafka sink for Spark 2.2?
>
>
>
> https://spark.apache.org/docs/2.2.0/structured-streaming-
> kafka-integration.html#creating-a-kafka-sink-for-streaming-queries
>
>
>
>
>
>
>
> *From: *Liana Napalkova <liana.napalk...@eurecat.org>
> *Date: *Monday, December 18, 2017 at 9:45 AM
> *To: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> Hi,
>
>
>
> I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I
> explain the problem is details. I appreciate any help.
>
>
>
> In Spark 1.6 I was doing something similar to this:
>
>
>
> DstreamFromKafka.foreachRDD(session => {
>         session.foreachPartition { partitionOfRecords =>
>           println("Setting the producer.")
>           val producer = Utils.createProducer(mySet.
> value("metadataBrokerList"),
>
> mySet.value("batchSize"),
>
> mySet.value("lingerMS"))
>           partitionOfRecords.foreach(s => {
>
>              //...
>
>
>
> However, I cannot find the proper way to do the similar thing in Spark
> 2.2. I tried to write my own class by extending `ForeachWriter`, but I get
> Task Serialization error when passing `KafkaProducer`.
>
> *class *MyTestClass(
>                             // *val inputparams*: String)
>   *extends *Serializable
> {
>
>   *val **spark *= SparkSession
>     .*builder*()
>     .appName("TEST")
>     //.config("spark.sql.warehouse.dir", kafkaData)
>     .enableHiveSupport()
>     .getOrCreate()
>
> *import **spark*.implicits._
>
> *val *df: Dataset[String] = *spark*.readStream
>      .format("kafka")
>      .option("kafka.bootstrap.servers", "localhost:9092")
>      .option("subscribe", "test")
>      .option("startingOffsets", "latest")
>      .option("failOnDataLoss", "true")
>      .load()
>      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
> String)] // Kafka sends bytes
>      .map(_._2)
>
> *val *producer = // create KafkaProducer
>
> *val *writer = *new *MyForeachWriter(producer: KafkaProducer[String,String])
>
> *val *query = df
>                           .writeStream
>                           .foreach(writer)
>                           .start
>
> query.awaitTermination()
>
> *spark*.stop()
>
>
> *class *MyForeachWriter *extends *ForeachWriter[String] *with *Serializable {
>
>   *var **producer*: KafkaProducer[String,String] = _
>
>   *def this*(producer: KafkaProducer[String,String])
>   {
>     *this*()
>     *this*.*producer *= producer
>   }
>
>   *override def *process(row: String): Unit =
>   {
>     // ...
>   }
>
>   *override def *close(errorOrNull: Throwable): Unit = {}
>
>   *override def *open(partitionId: Long, version: Long): Boolean = {
>
> *true  *}
>
> }
>
>
>
>
>
> *Liana Napalkova, PhD*
>
> *Big Data Analytics Unit*
> * ------------------------------ *
>
>
>
>
>
> *T  +34 **93 238 14 00 (ext. 1248)*
> *M +34 **633 426 677*
>
> *liana.napalk...@eurecat.org <liana.napalk...@eurecat.org>*
> ------------------------------
>
> Carrer Camí Antic de València 54
> <https://maps.google.com/?q=Cam%C3%AD+Antic+de+Val%C3%A8ncia+54&entry=gmail&source=g>-56,
> Edifici A - 08005 - Barcelona
> www.eurecat.org
>
> Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat
>
>
> ------------------------------
> DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè
> no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber
> immediatament a la següent adreça: le...@eurecat.org Si el destinatari
> d'aquest missatge no consent la utilització del correu electrònic via
> Internet i la gravació de missatges, li preguem que ens ho comuniqui
> immediatament.
>
> DISCLAIMER: Este mensaje puede contener información confidencial. Si usted
> no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo
> inmediatamente a la siguiente dirección: le...@eurecat.org Si el
> destinatario de este mensaje no consintiera la utilización del correo
> electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga
> en nuestro conocimiento de forma inmediata.
>
> DISCLAIMER: Privileged/Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message you should
> destroy this message, and notify us immediately to the following address:
> le...@eurecat.org. If the addressee of this message does not consent to
> the use of Internet e-mail and message recording, please notify us
> immediately.
> ------------------------------
>
>
> ------------------------------
> DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè
> no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber
> immediatament a la següent adreça: le...@eurecat.org Si el destinatari
> d'aquest missatge no consent la utilització del correu electrònic via
> Internet i la gravació de missatges, li preguem que ens ho comuniqui
> immediatament.
>
> DISCLAIMER: Este mensaje puede contener información confidencial. Si usted
> no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo
> inmediatamente a la siguiente dirección: le...@eurecat.org Si el
> destinatario de este mensaje no consintiera la utilización del correo
> electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga
> en nuestro conocimiento de forma inmediata.
>
> DISCLAIMER: Privileged/Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message you should
> destroy this message, and notify us immediately to the following address:
> le...@eurecat.org. If the addressee of this message does not consent to
> the use of Internet e-mail and message recording, please notify us
> immediately.
> ------------------------------
>
>
>

Reply via email to