I have the following code to read and process Kafka data using Structured
Streaming 

  
        object ETLTest {

          case class record(value: String, topic: String)

          def main(args: Array[String]): Unit = {
            run();
          }

          def run(): Unit = {

            val spark = SparkSession
              .builder
              .appName("Test JOB")
              .master("local[*]")
              .getOrCreate()

            val kafkaStreamingDF = spark
              .readStream
              .format("kafka")
              .option("kafka.bootstrap.servers", "...")
              .option("subscribe", "...")
              .option("failOnDataLoss", "false")
              .option("startingOffsets","earliest")
              .load()
              .selectExpr("CAST(value as STRING)", "CAST(timestamp as
STRING)","CAST(topic as STRING)")

            val sdvWriter = new ForeachWriter[record] {
              def open(partitionId: Long, version: Long): Boolean = {
                true
              }
              def process(record: record) = {
                println("record:: " + record)
              }
              def close(errorOrNull: Throwable): Unit = {}
            }

            val sdvDF = kafkaStreamingDF
              .as[record]
              .filter($"value".isNotNull)

        // DOES NOT WORK
            /*val query = sdvDF
                .writeStream
                .format("console")
                .start()
                .awaitTermination()*/

        // WORKS
            /*val query = sdvDF
              .writeStream
              .foreach(sdvWriter)
              .start()
              .awaitTermination()
              */

          }

        }

I am running this code from IntellijIdea IDE and when I use the
foreach(sdvWriter), I could see the records consumed from Kafka, but when I
use .writeStream.format("console") I do not see any records. I assume that
the console write stream is maintaining some sort of checkpoint and assumes
it has processed all the records. Is that the case ? Am I missing something
obvious here ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to