Hi,
I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I explain the problem is details. I appreciate any help. In Spark 1.6 I was doing something similar to this: DstreamFromKafka.foreachRDD(session => { session.foreachPartition { partitionOfRecords => println("Setting the producer.") val producer = Utils.createProducer(mySet.value("metadataBrokerList"), mySet.value("batchSize"), mySet.value("lingerMS")) partitionOfRecords.foreach(s => { //... However, I cannot find the proper way to do the similar thing in Spark 2.2. I tried to write my own class by extending `ForeachWriter`, but I get Task Serialization error when passing `KafkaProducer`. class MyTestClass( // val inputparams: String) extends Serializable { val spark = SparkSession .builder() .appName("TEST") //.config("spark.sql.warehouse.dir", kafkaData) .enableHiveSupport() .getOrCreate() import spark.implicits._ val df: Dataset[String] = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "latest") .option("failOnDataLoss", "true") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] // Kafka sends bytes .map(_._2) val producer = // create KafkaProducer val writer = new MyForeachWriter(producer: KafkaProducer[String,String]) val query = df .writeStream .foreach(writer) .start query.awaitTermination() spark.stop() class MyForeachWriter extends ForeachWriter[String] with Serializable { var producer: KafkaProducer[String,String] = _ def this(producer: KafkaProducer[String,String]) { this() this.producer = producer } override def process(row: String): Unit = { // ... } override def close(errorOrNull: Throwable): Unit = {} override def open(partitionId: Long, version: Long): Boolean = { true } } Liana Napalkova, PhD Big Data Analytics Unit ________________________________ [http://cdn.eurecat.org/imgs/logomailEurecat.jpg] T +34 93 238 14 00 (ext. 1248) M +34 633 426 677 liana.napalk...@eurecat.org ________________________________ Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona www.eurecat.org [http://cdn.eurecat.org/imgs/degradat.jpg] Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat ________________________________ DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no consent la utilització del correu electrònic via Internet i la gravació de missatges, li preguem que ens ho comuniqui immediatament. DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo inmediatamente a la siguiente dirección: le...@eurecat.org Si el destinatario de este mensaje no consintiera la utilización del correo electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga en nuestro conocimiento de forma inmediata. DISCLAIMER: Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message you should destroy this message, and notify us immediately to the following address: le...@eurecat.org. If the addressee of this message does not consent to the use of Internet e-mail and message recording, please notify us immediately. ________________________________