Well, the code itself doesn't seem to be OK - you're using
ProductStateInformation as the class of State whereas you provide
ProductSessionInformation to Encoder for State.

On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> Could you play with Encoders.bean()? You can Encoders.bean() with your
> class, and call .schema() with the return value to see how it transforms to
> the schema in Spark SQL. The schema must be consistent across multiple JVM
> runs to make it work properly, but I suspect it doesn't retain the order.
>
> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <srini....@gmail.com> wrote:
>
>> I am listening to Kafka topic with a structured streaming application
>> with Java,  testing it on my local Mac.
>> When I retrieve back GroupState<ProductSessionInformation> object with
>> state.get(), it is giving some random values for the fields in the object,
>> some are interchanging some are default and some are junk values.
>>
>> See this example below:
>> While setting I am setting:
>> ProductSessionInformation{requestId='222112345', productId='222112345',
>> priority='0', firstEventTimeMillis=1585312384,
>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>> numberOfEvents=1}
>>
>> When I retrieve it back, it comes like this:
>> ProductSessionInformation{requestId='some junk characters are coming
>> here' productId='222112345', priority='222112345',
>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>
>> Any clue why it might be happening? I am stuck with this for couple of
>> days. Immediate help is appreciated.
>>
>> code snippet:
>>
>>
>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, 
>> Event, ProductStateInformation, ProductSessionUpdate> {
>>
>>  @Override
>> public ProductSessionUpdate call(String productId, Iterator<Event> 
>> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>     {
>>
>>
>>
>>   if (state.hasTimedOut()) {
>>
>> //....
>>
>> }else{
>>
>> if (state.exists()) {
>>     ProductStateInformation oldSession = state.get();
>>     System.out.println("State for productId:"+productId + " with old values 
>> "+oldSession);
>>
>> }
>>
>>
>> public class EventsApp implements Serializable{
>>
>> public void run(String[] args) throws Exception {
>>
>> ...
>>
>>
>> Dataset<Row> dataSet = sparkSession
>>         .readStream()
>>         .format("kafka")
>>         .option("kafka.bootstrap.servers", "localhost")
>>         .option("startingOffsets","latest")
>>         .option("failOnDataLoss", "false")
>>         .option("subscribe", "topic1,topic2")
>>         .option("includeTimestamp", true)
>>
>>         .load();
>>
>>  eventsDS.groupByKey(
>>                 new MapFunction<Event, String>() {
>>                     @Override public String call(Event event) {
>>                         return event.getProductId();
>>                     }
>>                 }, Encoders.STRING())
>>         .mapGroupsWithState(
>>                 new StateUpdateTask(30000),
>>                 Encoders.bean(ProductSessionInformation.class),
>>                 Encoders.bean(ProductSessionUpdate.class),
>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>
>> ...
>>
>>
>> StreamingQuery query = productUpdates
>>         .writeStream()
>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>             @Override
>>             public boolean open(long l, long l1) {return true;}
>>
>>             @Override
>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>             }
>>
>>             @Override
>>             public void close(Throwable throwable) {}
>>         })
>>         .outputMode("update")
>>         .option("checkpointLocation", checkpointDir)
>>         .start();
>>
>> query.awaitTermination();
>>
>> }
>>
>> public class ProductStateInformation implements Serializable {
>>
>>     protected String requestId;
>>     protected String productId;
>>     protected String priority;
>>     protected long firstEventTimeMillis;
>>     protected long lastEventTimeMillis;
>>     protected long firstReceivedTimeMillis;
>>     protected int numberOfEvents;
>>
>> ...//getter setters
>>
>> }
>>
>> These are are the versions I am using:
>>
>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>> <spark.version>2.4.3</spark.version>
>>
>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>
>> <kryo.version>3.0.3</kryo.version>
>>
>>

Reply via email to