Well, the code itself doesn't seem to be OK - you're using ProductStateInformation as the class of State whereas you provide ProductSessionInformation to Encoder for State.
On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote: > Could you play with Encoders.bean()? You can Encoders.bean() with your > class, and call .schema() with the return value to see how it transforms to > the schema in Spark SQL. The schema must be consistent across multiple JVM > runs to make it work properly, but I suspect it doesn't retain the order. > > On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <srini....@gmail.com> wrote: > >> I am listening to Kafka topic with a structured streaming application >> with Java, testing it on my local Mac. >> When I retrieve back GroupState<ProductSessionInformation> object with >> state.get(), it is giving some random values for the fields in the object, >> some are interchanging some are default and some are junk values. >> >> See this example below: >> While setting I am setting: >> ProductSessionInformation{requestId='222112345', productId='222112345', >> priority='0', firstEventTimeMillis=1585312384, >> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693, >> numberOfEvents=1} >> >> When I retrieve it back, it comes like this: >> ProductSessionInformation{requestId='some junk characters are coming >> here' productId='222112345', priority='222112345', >> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1, >> firstReceivedTimeMillis=1585312384, numberOfEvents=1} >> >> Any clue why it might be happening? I am stuck with this for couple of >> days. Immediate help is appreciated. >> >> code snippet: >> >> >> public class StateUpdateTask implements MapGroupsWithStateFunction<String, >> Event, ProductStateInformation, ProductSessionUpdate> { >> >> @Override >> public ProductSessionUpdate call(String productId, Iterator<Event> >> eventsIterator, GroupState<ProductStateInformation> state) throws Exception { >> { >> >> >> >> if (state.hasTimedOut()) { >> >> //.... >> >> }else{ >> >> if (state.exists()) { >> ProductStateInformation oldSession = state.get(); >> System.out.println("State for productId:"+productId + " with old values >> "+oldSession); >> >> } >> >> >> public class EventsApp implements Serializable{ >> >> public void run(String[] args) throws Exception { >> >> ... >> >> >> Dataset<Row> dataSet = sparkSession >> .readStream() >> .format("kafka") >> .option("kafka.bootstrap.servers", "localhost") >> .option("startingOffsets","latest") >> .option("failOnDataLoss", "false") >> .option("subscribe", "topic1,topic2") >> .option("includeTimestamp", true) >> >> .load(); >> >> eventsDS.groupByKey( >> new MapFunction<Event, String>() { >> @Override public String call(Event event) { >> return event.getProductId(); >> } >> }, Encoders.STRING()) >> .mapGroupsWithState( >> new StateUpdateTask(30000), >> Encoders.bean(ProductSessionInformation.class), >> Encoders.bean(ProductSessionUpdate.class), >> GroupStateTimeout.ProcessingTimeTimeout()); >> >> ... >> >> >> StreamingQuery query = productUpdates >> .writeStream() >> .foreach(new ForeachWriter<ProductSessionUpdate>() { >> @Override >> public boolean open(long l, long l1) {return true;} >> >> @Override >> public void process(ProductSessionUpdate productSessionUpdate) { >> logger.info("-----> query process: "+ productSessionUpdate); >> } >> >> @Override >> public void close(Throwable throwable) {} >> }) >> .outputMode("update") >> .option("checkpointLocation", checkpointDir) >> .start(); >> >> query.awaitTermination(); >> >> } >> >> public class ProductStateInformation implements Serializable { >> >> protected String requestId; >> protected String productId; >> protected String priority; >> protected long firstEventTimeMillis; >> protected long lastEventTimeMillis; >> protected long firstReceivedTimeMillis; >> protected int numberOfEvents; >> >> ...//getter setters >> >> } >> >> These are are the versions I am using: >> >> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version> >> <spark.version>2.4.3</spark.version> >> >> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version> >> >> <kryo.version>3.0.3</kryo.version> >> >>