-dev Have you tried clearing out the checkpoint directory? Can you also give the full stack trace?
On Wed, May 24, 2017 at 3:45 PM, kant kodali <kanth...@gmail.com> wrote: > Even if I do simple count aggregation like below I get the same error as > https://issues.apache.org/jira/browse/SPARK-19268 > > Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 > hours", "24 hours"), df1.col("AppName")).count(); > > > On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi All, >> >> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and >> Kafka >> >> I am running into the same problem as https://issues.apache.org/jira >> /browse/SPARK-19268 with my app(not KafkaWordCount). >> >> Here is my sample code >> >> *Here is how I create ReadStream* >> >> sparkSession.readStream() >> .format("kafka") >> .option("kafka.bootstrap.servers", >> config.getString("kafka.consumer.settings.bootstrapServers")) >> .option("subscribe", >> config.getString("kafka.consumer.settings.topicName")) >> .option("startingOffsets", "earliest") >> .option("failOnDataLoss", "false") >> .option("checkpointLocation", hdfsCheckPointDir) >> .load(); >> >> >> *The core logic* >> >> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), >> client.getSchema()).as("payload")); >> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*"); >> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 >> hours"), df1.col("AppName")).agg(sum("Amount")); >> StreamingQuery query = df1.writeStream().foreach(new >> KafkaSink()).outputMode("update").start(); >> query.awaitTermination(); >> >> >> I can also provide any other information you may need. >> >> Thanks! >> > >