Hi All, I am using Spark 2.1.1 and running in a Standalone mode using HDFS and Kafka
I am running into the same problem as https://issues.apache.org/jira/browse/SPARK-19268 with my app(not KafkaWordCount). Here is my sample code *Here is how I create ReadStream* sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers")) .option("subscribe", config.getString("kafka.consumer.settings.topicName")) .option("startingOffsets", "earliest") .option("failOnDataLoss", "false") .option("checkpointLocation", hdfsCheckPointDir) .load(); *The core logic* Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload")); Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*"); Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount")); StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start(); query.awaitTermination(); I can also provide any other information you may need. Thanks!