Hi All,

I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
Kafka

I am running into the same problem as
https://issues.apache.org/jira/browse/SPARK-19268 with my app(not
KafkaWordCount).

Here is my sample code

*Here is how I create ReadStream*

sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",
config.getString("kafka.consumer.settings.bootstrapServers"))
                .option("subscribe",
config.getString("kafka.consumer.settings.topicName"))
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss", "false")
                .option("checkpointLocation", hdfsCheckPointDir)
                .load();


*The core logic*

Dataset<Row> df = ds.select(from_json(new
Column("value").cast("string"), client.getSchema()).as("payload"));
Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24
hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
StreamingQuery query = df1.writeStream().foreach(new
KafkaSink()).outputMode("update").start();
query.awaitTermination();


I can also provide any other information you may need.

Thanks!

Reply via email to