I have the following code to read and process Kafka data using Structured Streaming
object ETLTest { case class record(value: String, topic: String) def main(args: Array[String]): Unit = { run(); } def run(): Unit = { val spark = SparkSession .builder .appName("Test JOB") .master("local[*]") .getOrCreate() val kafkaStreamingDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "...") .option("subscribe", "...") .option("failOnDataLoss", "false") .option("startingOffsets","earliest") .load() .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)") val sdvWriter = new ForeachWriter[record] { def open(partitionId: Long, version: Long): Boolean = { true } def process(record: record) = { println("record:: " + record) } def close(errorOrNull: Throwable): Unit = {} } val sdvDF = kafkaStreamingDF .as[record] .filter($"value".isNotNull) // DOES NOT WORK /*val query = sdvDF .writeStream .format("console") .start() .awaitTermination()*/ // WORKS /*val query = sdvDF .writeStream .foreach(sdvWriter) .start() .awaitTermination() */ } } I am running this code from IntellijIdea IDE and when I use the foreach(sdvWriter), I could see the records consumed from Kafka, but when I use .writeStream.format("console") I do not see any records. I assume that the console write stream is maintaining some sort of checkpoint and assumes it has processed all the records. Is that the case ? Am I missing something obvious here ? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org